diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index 36daf72144b01..5a82d7f49f27c 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -1208,3 +1208,25 @@ jobs:
FLINK_PROFILE: ${{ matrix.flinkProfile }}
run:
mvn test -Punit-tests -Djava17 -Djava.version=17 -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -pl hudi-examples/hudi-examples-flink $MVN_ARGS
+
+ test-hudi-trino-plugin:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v3
+ - name: Set up JDK 23
+ uses: actions/setup-java@v3
+ with:
+ # Note: We are not caching here again, as we want to use the .m2 repository populated by
+ # the previous step
+ java-version: '23'
+ distribution: 'temurin'
+ architecture: x64
+ - name: Build hudi-trino-plugin with JDK 23
+ working-directory: ./hudi-trino-plugin
+ run:
+ mvn clean install -DskipTests
+ - name: Test hudi-trino-plugin with JDK 23
+ working-directory: ./hudi-trino-plugin
+ run:
+ mvn test
diff --git a/hudi-trino-plugin/.mvn/modernizer/violations-production-code-only.xml b/hudi-trino-plugin/.mvn/modernizer/violations-production-code-only.xml
new file mode 100644
index 0000000000000..4653d63fafc28
--- /dev/null
+++ b/hudi-trino-plugin/.mvn/modernizer/violations-production-code-only.xml
@@ -0,0 +1,86 @@
+
+
+
+
+ java/util/concurrent/ThreadPoolExecutor."<init>":(IIJLjava/util/concurrent/TimeUnit;Ljava/util/concurrent/BlockingQueue;)V
+ 1.1
+ Use constructor that takes ThreadFactory and name the threads
+
+
+
+ java/util/concurrent/ThreadPoolExecutor."<init>":(IIJLjava/util/concurrent/TimeUnit;Ljava/util/concurrent/BlockingQueue;Ljava/util/concurrent/RejectedExecutionHandler;)V
+ 1.1
+ Use constructor that takes ThreadFactory and name the threads
+
+
+
+ java/util/concurrent/ScheduledThreadPoolExecutor."<init>":(I)V
+ 1.1
+ Use constructor that takes ThreadFactory and name the threads
+
+
+
+ java/util/concurrent/ScheduledThreadPoolExecutor."<init>":(ILjava/util/concurrent/RejectedExecutionHandler;)V
+ 1.1
+ Use constructor that takes ThreadFactory and name the threads
+
+
+
+ java/util/concurrent/Executors.newFixedThreadPool:(I)Ljava/util/concurrent/ExecutorService;
+ 1.1
+ Use factory method that takes ThreadFactory and name the threads
+
+
+
+ java/util/concurrent/Executors.newWorkStealingPool:()Ljava/util/concurrent/ExecutorService;
+ 1.1
+ Use factory method that takes ThreadFactory and name the threads
+
+
+
+ java/util/concurrent/Executors.newWorkStealingPool:(I)Ljava/util/concurrent/ExecutorService;
+ 1.1
+ Use factory method that takes ThreadFactory and name the threads
+
+
+
+ java/util/concurrent/Executors.newSingleThreadExecutor:()Ljava/util/concurrent/ExecutorService;
+ 1.1
+ Use factory method that takes ThreadFactory and name the threads
+
+
+
+ java/util/concurrent/Executors.newCachedThreadPool:()Ljava/util/concurrent/ExecutorService;
+ 1.1
+ Use factory method that takes ThreadFactory and name the threads
+
+
+
+ java/util/concurrent/Executors.newSingleThreadScheduledExecutor:()Ljava/util/concurrent/ScheduledExecutorService;
+ 1.1
+ Use factory method that takes ThreadFactory and name the threads
+
+
+
+ java/util/concurrent/Executors.newScheduledThreadPool:(I)Ljava/util/concurrent/ScheduledExecutorService;
+ 1.1
+ Use factory method that takes ThreadFactory and name the threads
+
+
diff --git a/hudi-trino-plugin/.mvn/modernizer/violations.xml b/hudi-trino-plugin/.mvn/modernizer/violations.xml
new file mode 100644
index 0000000000000..eae8ea39b5916
--- /dev/null
+++ b/hudi-trino-plugin/.mvn/modernizer/violations.xml
@@ -0,0 +1,365 @@
+
+
+
+
+ java/lang/Class.newInstance:()Ljava/lang/Object;
+ 1.1
+ Prefer Class.getConstructor().newInstance()
+
+
+
+ java/lang/String."<init>":([B)V
+ 1.1
+ Prefer new String(byte[], Charset)
+
+
+
+ java/lang/String.getBytes:()[B
+ 1.1
+ Prefer String.getBytes(Charset)
+
+
+
+ java/lang/String.toString:()Ljava/lang/String;
+ 1.1
+ Call to toString() is redundant
+
+
+
+
+ java/io/File.toString:()Ljava/lang/String;
+ 1.1
+ Prefer File.getPath()
+
+
+
+ java/lang/Thread$Builder.factory:()Ljava/util/concurrent/ThreadFactory;
+ 1.1
+ Use io.airlift.concurrent.Threads's thread factories, as the set thread context class loader
+
+
+ java/lang/Thread$Builder$OfPlatform.factory:()Ljava/util/concurrent/ThreadFactory;
+ 1.1
+ Use io.airlift.concurrent.Threads's thread factories, as the set thread context class loader
+
+
+ java/lang/Thread$Builder$OfVirtual.factory:()Ljava/util/concurrent/ThreadFactory;
+ 1.1
+ Use io.airlift.concurrent.Threads's thread factories, as the set thread context class loader
+
+
+
+ com/google/common/primitives/Ints.checkedCast:(J)I
+ 1.8
+ Prefer Math.toIntExact(long)
+
+
+
+ com/google/common/collect/ImmutableMap$Builder.build:()Lcom/google/common/collect/ImmutableMap;
+ 1.8
+ Use buildOrThrow() instead, as it makes it clear that it will throw on duplicated values
+
+
+ com/google/common/collect/ImmutableTable$Builder.build:()Lcom/google/common/collect/ImmutableTable;
+ 1.8
+ Use buildOrThrow() instead, as it makes it clear that it will throw on duplicated values
+
+
+
+ com/google/common/collect/ImmutableBiMap$Builder."<init>":()V
+ 1.8
+ Use builder() static factory method instead
+
+
+ com/google/common/collect/ImmutableList$Builder."<init>":()V
+ 1.8
+ Use builder() static factory method instead
+
+
+ com/google/common/collect/ImmutableMap$Builder."<init>":()V
+ 1.8
+ Use builder() static factory method instead
+
+
+ com/google/common/collect/ImmutableMultimap$Builder."<init>":()V
+ 1.8
+ Use builder() static factory method instead
+
+
+ com/google/common/collect/ImmutableMultiset$Builder."<init>":()V
+ 1.8
+ Use builder() static factory method instead
+
+
+ com/google/common/collect/ImmutableSet$Builder."<init>":()V
+ 1.8
+ Use builder() static factory method instead
+
+
+ com/google/common/collect/ImmutableSortedMap$Builder."<init>":()V
+ 1.8
+ Use orderedBy() static factory method instead
+
+
+ com/google/common/collect/ImmutableSortedSet$Builder."<init>":()V
+ 1.8
+ Use orderedBy() static factory method instead
+
+
+ com/google/common/collect/ImmutableTable$Builder."<init>":()V
+ 1.8
+ Use builder() static factory method instead
+
+
+
+ com/google/common/cache/CacheBuilder.build:()Lcom/google/common/cache/Cache;
+ 1.8
+ Guava Cache has concurrency issues around invalidation and ongoing loads. Use EvictableCacheBuilder or SafeCaches to build caches.
+ See https://github.com/trinodb/trino/issues/10512 for more information and see https://github.com/trinodb/trino/issues/10512#issuecomment-1016221168
+ for why Caffeine does not solve the problem.
+
+
+
+ com/google/common/cache/CacheBuilder.build:(Lcom/google/common/cache/CacheLoader;)Lcom/google/common/cache/LoadingCache;
+ 1.8
+ Guava LoadingCache has concurrency issues around invalidation and ongoing loads. Use EvictableCacheBuilder or SafeCaches to build caches.
+ See https://github.com/trinodb/trino/issues/10512 for more information and see https://github.com/trinodb/trino/issues/10512#issuecomment-1016221168
+ for why Caffeine does not solve the problem.
+
+
+
+ org/testng/Assert.assertEquals:(Ljava/lang/Iterable;Ljava/lang/Iterable;)V
+ 1.8
+ Use AssertJ or QueryAssertions due to TestNG #543
+
+
+
+ org/testng/Assert.assertEquals:(Ljava/lang/Iterable;Ljava/lang/Iterable;Ljava/lang/String;)V
+ 1.8
+ Use AssertJ or QueryAssertions due to TestNG #543
+
+
+
+ org/testng/Assert.assertThrows:(Lorg/testng/Assert$ThrowingRunnable;)V
+ 1.8
+ Use AssertJ's assertThatThrownBy, see https://github.com/trinodb/trino/issues/5320 for rationale
+
+
+
+ org/testng/Assert.assertThrows:(Ljava/lang/Class;Lorg/testng/Assert$ThrowingRunnable;)V
+ 1.8
+ Use AssertJ's assertThatThrownBy, see https://github.com/trinodb/trino/issues/5320 for rationale
+
+
+
+ com/amazonaws/services/glue/model/Table.getStorageDescriptor:()Lcom/amazonaws/services/glue/model/StorageDescriptor;
+ 1.1
+ Storage descriptor is nullable in Glue model, which is too easy to forget about. Prefer GlueToTrinoConverter.getStorageDescriptor
+
+
+
+ com/amazonaws/services/glue/model/Table.getTableType:()Ljava/lang/String;
+ 1.1
+ Table type is nullable in Glue model, which is too easy to forget about. Prefer GlueToTrinoConverter.getTableType
+
+
+
+ com/amazonaws/services/glue/model/Column.getParameters:()Ljava/util/Map;
+ 1.1
+ Column parameters map is nullable in Glue model, which is too easy to forget about. Prefer GlueToTrinoConverter.getColumnParameters
+
+
+
+ com/amazonaws/services/glue/model/Table.getParameters:()Ljava/util/Map;
+ 1.1
+ Table parameters map is nullable in Glue model, which is too easy to forget about. Prefer GlueToTrinoConverter.getTableParameters
+
+
+
+ com/amazonaws/services/glue/model/Partition.getParameters:()Ljava/util/Map;
+ 1.1
+ Partition parameters map is nullable in Glue model, which is too easy to forget about. Prefer GlueToTrinoConverter.getPartitionParameters
+
+
+
+ com/amazonaws/services/glue/model/SerDeInfo.getParameters:()Ljava/util/Map;
+ 1.1
+ SerDeInfo parameters map is nullable in Glue model, which is too easy to forget about. Prefer GlueToTrinoConverter.getSerDeInfoParameters
+
+
+
+ org/apache/hadoop/fs/FileSystem.close:()V
+ 1.1
+ Hadoop FileSystem instances are shared and should not be closed
+
+
+
+ java/util/TimeZone.getTimeZone:(Ljava/lang/String;)Ljava/util/TimeZone;
+ 1.8
+ Avoid TimeZone.getTimeZone as it returns GMT for a zone not supported by the JVM. Use TimeZone.getTimeZone(ZoneId.of(..)) instead, or TimeZone.getTimeZone(..., false).
+
+
+
+ org/joda/time/DateTimeZone.toTimeZone:()Ljava/util/TimeZone;
+ 1.8
+ Avoid DateTimeZone.toTimeZone as it returns GMT for a zone not supported by the JVM. Use TimeZone.getTimeZone(ZoneId.of(dtz.getId())) instead.
+
+
+
+ com/esri/core/geometry/ogc/OGCGeometry.equals:(Lcom/esri/core/geometry/ogc/OGCGeometry;)Z
+ 1.6
+ Prefer OGCGeometry.Equals(OGCGeometry)
+
+
+
+ com/esri/core/geometry/ogc/OGCGeometry.equals:(Ljava/lang/Object;)Z
+ 1.6
+ Prefer OGCGeometry.Equals(OGCGeometry)
+
+
+
+ io/airlift/units/DataSize."<init>":(DLio/airlift/units/DataSize$Unit;)V
+ 1.8
+ Use io.airlift.units.DataSize.of(long, DataSize.Unit)
+
+
+
+ io/airlift/units/DataSize.succinctDataSize:(DLio/airlift/units/DataSize$Unit;)Lio/airlift/units/DataSize;
+ 1.8
+ Use io.airlift.units.DataSize.of(long, DataSize.Unit).succinct() -- Note that succinct conversion only affects toString() results
+
+
+
+ io/airlift/units/DataSize.getValue:()D
+ 1.8
+ Use io.airlift.units.DataSize.toBytes() and Unit.inBytes() for conversion
+
+
+
+ io/airlift/units/DataSize.getValue:(Lio/airlift/units/DataSize$Unit;)D
+ 1.8
+ Use io.airlift.units.DataSize.toBytes() and Unit.inBytes() for conversion
+
+
+
+ io/airlift/units/DataSize.roundTo:(Lio/airlift/units/DataSize$Unit;)J
+ 1.8
+ Method is deprecated for removal
+
+
+
+ io/airlift/units/DataSize.convertTo:(Lio/airlift/units/DataSize$Unit;)Lio/airlift/units/DataSize;
+ 1.8
+ Use io.airlift.units.DataSize.to(DataSize.Unit)
+
+
+
+ io/airlift/units/DataSize.convertToMostSuccinctDataSize:()Lio/airlift/units/DataSize;
+ 1.8
+ Use io.airlift.units.DataSize.succinct()
+
+
+
+ io/airlift/testing/Closeables.closeQuietly:([Ljava/io/Closeable;)V
+ 1.0
+ Use Closeables.closeAll() or Closer.
+
+
+
+ com/google/inject/util/Modules.combine:(Ljava/lang/Iterable;)Lcom/google/inject/Module;
+ 1.8
+ Use io.airlift.configuration.ConfigurationAwareModule.combine
+
+
+
+ com/google/inject/util/Modules.combine:([Lcom/google/inject/Module;)Lcom/google/inject/Module;
+ 1.8
+ Use io.airlift.configuration.ConfigurationAwareModule.combine
+
+
+
+ io/jsonwebtoken/Jwts.builder:()Lio/jsonwebtoken/JwtBuilder;
+ 1.8
+ Use io.trino.server.security.jwt.JwtsUtil or equivalent
+
+
+
+ io/jsonwebtoken/Jwts.parserBuilder:()Lio/jsonwebtoken/JwtParserBuilder;
+ 1.8
+ Use io.trino.server.security.jwt.JwtsUtil or equivalent
+
+
+
+ org/openjdk/jol/info/ClassLayout.instanceSize:()J
+ 1.8
+ Use io.airlift.slice.SizeOf.instanceSize
+
+
+
+ org/testng/annotations/BeforeTest
+ 1.8
+ Prefer org.testng.annotations.BeforeClass
+
+
+
+ org/testng/annotations/AfterTest
+ 1.8
+ Prefer org.testng.annotations.AfterClass
+
+
+
+ com/fasterxml/jackson/core/JsonFactory."<init>":()V
+ 1.8
+ Use io.trino.plugin.base.util.JsonUtils.jsonFactory()
+
+
+
+ com/fasterxml/jackson/core/JsonFactoryBuilder."<init>":()V
+ 1.8
+ Use io.trino.plugin.base.util.JsonUtils.jsonFactoryBuilder() instead
+
+
+
+ software/amazon/awssdk/services/glue/model/Table.tableType:()Ljava/lang/String;
+ 1.8
+ Table type is nullable in Glue model, which is too easy to forget about. Prefer GlueConverter.getTableTypeNullable
+
+
+
+ org/assertj/core/util/Files.newTemporaryFile:()Ljava/io/File;
+ 1.1
+ Use @TempDir instead
+
+
+ org/assertj/core/util/Files.newTemporaryFolder:()Ljava/io/File;
+ 1.1
+ Use @TempDir instead
+
+
+ org/assertj/core/util/Files.temporaryFolder:()Ljava/io/File;
+ 1.1
+ Use @TempDir instead
+
+
+ org/assertj/core/util/Files.temporaryFolderPath:()Ljava/lang/String;
+ 1.1
+ Use @TempDir instead
+
+
diff --git a/hudi-trino-plugin/pom.xml b/hudi-trino-plugin/pom.xml
new file mode 100644
index 0000000000000..d819b3093e239
--- /dev/null
+++ b/hudi-trino-plugin/pom.xml
@@ -0,0 +1,460 @@
+
+
+
+ 4.0.0
+
+
+ io.trino
+ trino-root
+ 472
+
+
+
+
+ trino-hudi
+ trino-plugin
+ Trino - Hudi connector
+
+
+ true
+ 1.0.2
+
+
+
+
+
+ com.esotericsoftware
+ kryo
+ 4.0.2
+
+
+
+ com.google.errorprone
+ error_prone_annotations
+ true
+
+
+
+ com.google.guava
+ guava
+
+
+
+ com.google.inject
+ guice
+
+
+
+ io.airlift
+ bootstrap
+
+
+
+ io.airlift
+ concurrent
+
+
+
+ io.airlift
+ configuration
+
+
+
+ io.airlift
+ json
+
+
+
+ io.airlift
+ log
+
+
+
+ io.airlift
+ units
+
+
+
+ io.trino
+ trino-filesystem
+
+
+
+ io.trino
+ trino-filesystem-manager
+
+
+
+ io.trino
+ trino-hive
+
+
+
+ io.trino
+ trino-memory-context
+
+
+
+ io.trino
+ trino-metastore
+
+
+
+ io.trino
+ trino-parquet
+
+
+
+ io.trino
+ trino-plugin-toolkit
+
+
+
+ jakarta.validation
+ jakarta.validation-api
+
+
+
+ joda-time
+ joda-time
+
+
+
+ org.apache.avro
+ avro
+
+
+
+ org.apache.hudi
+ hudi-common
+ ${dep.hudi.version}
+
+
+ io.dropwizard.metrics
+ *
+
+
+ org.apache.hbase
+ *
+
+
+ org.apache.httpcomponents
+ *
+
+
+ org.apache.orc
+ *
+
+
+
+
+
+ org.apache.hudi
+ hudi-io
+ ${dep.hudi.version}
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+
+
+ org.apache.parquet
+ parquet-column
+
+
+
+ org.weakref
+ jmxutils
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ io.opentelemetry
+ opentelemetry-api
+ provided
+
+
+
+ io.opentelemetry
+ opentelemetry-api-incubator
+ provided
+
+
+
+ io.opentelemetry
+ opentelemetry-context
+ provided
+
+
+
+ io.trino
+ trino-spi
+ provided
+
+
+
+ org.openjdk.jol
+ jol-core
+ provided
+
+
+
+ com.github.ben-manes.caffeine
+ caffeine
+ runtime
+
+
+
+ io.airlift
+ log-manager
+ runtime
+
+
+
+ io.dropwizard.metrics
+ metrics-core
+ runtime
+
+
+
+ io.trino
+ trino-hive-formats
+ runtime
+
+
+
+ org.jetbrains
+ annotations
+ runtime
+
+
+
+ io.airlift
+ junit-extensions
+ test
+
+
+
+ io.airlift
+ testing
+ test
+
+
+
+ io.trino
+ trino-client
+ test
+
+
+
+ io.trino
+ trino-hdfs
+ test
+
+
+
+ io.trino
+ trino-hive
+ test-jar
+ test
+
+
+
+ io.trino
+ trino-main
+ test
+
+
+
+ io.trino
+ trino-main
+ test-jar
+ test
+
+
+
+ io.trino
+ trino-parser
+ test
+
+
+
+ io.trino
+ trino-spi
+ test-jar
+ test
+
+
+
+ io.trino
+ trino-testing
+ test
+
+
+
+ io.trino
+ trino-testing-containers
+ test
+
+
+
+ io.trino
+ trino-testing-services
+ test
+
+
+
+ io.trino
+ trino-tpch
+ test
+
+
+
+ io.trino.hadoop
+ hadoop-apache
+ test
+
+
+
+ io.trino.tpch
+ tpch
+ test
+
+
+
+ org.apache.hudi
+ hudi-client-common
+ ${dep.hudi.version}
+ test
+
+
+ *
+ *
+
+
+
+
+
+ org.apache.hudi
+ hudi-hadoop-common
+ ${dep.hudi.version}
+ test
+
+
+ *
+ *
+
+
+
+
+
+ org.apache.hudi
+ hudi-java-client
+ ${dep.hudi.version}
+ test
+
+
+ org.apache.hudi
+ *
+
+
+
+
+
+ org.apache.parquet
+ parquet-avro
+ test
+
+
+
+ org.apache.parquet
+ parquet-hadoop
+ test
+
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+ org.json
+ json
+ 20250107
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ test
+
+
+
+
+
+
+ org.basepom.maven
+ duplicate-finder-maven-plugin
+
+
+
+ log4j.properties
+ log4j-surefire.properties
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 8
+ 8
+
+
+
+
+
diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/ForHudiSplitManager.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/ForHudiSplitManager.java
new file mode 100644
index 0000000000000..0f1987594a1ac
--- /dev/null
+++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/ForHudiSplitManager.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi;
+
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+@Retention(RUNTIME)
+@Target({FIELD, PARAMETER, METHOD})
+@BindingAnnotation
+public @interface ForHudiSplitManager {}
diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/ForHudiSplitSource.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/ForHudiSplitSource.java
new file mode 100644
index 0000000000000..801b1e0309407
--- /dev/null
+++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/ForHudiSplitSource.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi;
+
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+@Retention(RUNTIME)
+@Target({FIELD, PARAMETER, METHOD})
+@BindingAnnotation
+public @interface ForHudiSplitSource {}
diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiBaseFileOnlyPageSource.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiBaseFileOnlyPageSource.java
new file mode 100644
index 0000000000000..1180638a7cc47
--- /dev/null
+++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiBaseFileOnlyPageSource.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi;
+
+import com.google.common.collect.ImmutableList;
+import io.trino.plugin.hive.HiveColumnHandle;
+import io.trino.plugin.hudi.util.SynthesizedColumnHandler;
+import io.trino.spi.Page;
+import io.trino.spi.block.Block;
+import io.trino.spi.connector.ConnectorPageSource;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This page source is for reading data columns in the parquet format.
+ * This page source also avoids costly avro IndexRecord serialization.
+ */
+public class HudiBaseFileOnlyPageSource
+ implements ConnectorPageSource
+{
+ private final ConnectorPageSource dataPageSource;
+ private final List allOutputColumns;
+ private final SynthesizedColumnHandler synthesizedColumnHandler;
+ // Maps output channel to physical source channel, or -1 if synthesized
+ private final int[] physicalSourceChannelMap;
+
+ public HudiBaseFileOnlyPageSource(
+ ConnectorPageSource dataPageSource,
+ List allOutputColumns,
+ // Columns provided by dataPageSource
+ List dataColumns,
+ // Handler to manage synthesized/virtual in Hudi tables such as partition columns and metadata, i.e. file size (not hudi metadata)
+ SynthesizedColumnHandler synthesizedColumnHandler)
+ {
+ this.dataPageSource = requireNonNull(dataPageSource, "dataPageSource is null");
+ this.allOutputColumns = ImmutableList.copyOf(requireNonNull(allOutputColumns, "allOutputColumns is null"));
+ this.synthesizedColumnHandler = requireNonNull(synthesizedColumnHandler, "synthesizedColumnHandler is null");
+
+ // Create a mapping from the channel index in the output page to the channel index in the physicalDataPageSource's page
+ this.physicalSourceChannelMap = new int[allOutputColumns.size()];
+ Map physicalColumnNameToChannel = new HashMap<>();
+ for (int i = 0; i < dataColumns.size(); i++) {
+ physicalColumnNameToChannel.put(dataColumns.get(i).getName().toLowerCase(Locale.ENGLISH), i);
+ }
+
+ for (int i = 0; i < allOutputColumns.size(); i++) {
+ this.physicalSourceChannelMap[i] = physicalColumnNameToChannel.getOrDefault(allOutputColumns.get(i).getName().toLowerCase(Locale.ENGLISH), -1);
+ }
+ }
+
+ @Override
+ public long getCompletedBytes()
+ {
+ return dataPageSource.getCompletedBytes();
+ }
+
+ @Override
+ public long getReadTimeNanos()
+ {
+ return dataPageSource.getReadTimeNanos();
+ }
+
+ @Override
+ public boolean isFinished()
+ {
+ return dataPageSource.isFinished();
+ }
+
+ @Override
+ public Page getNextPage()
+ {
+ Page physicalSourcePage = dataPageSource.getNextPage();
+ if (physicalSourcePage == null) {
+ return null;
+ }
+
+ int positionCount = physicalSourcePage.getPositionCount();
+ if (positionCount == 0 && synthesizedColumnHandler.getSynthesizedColumnCount() == 0) {
+ // If only physical columns and page is empty
+ return physicalSourcePage;
+ }
+
+ Block[] outputBlocks = new Block[allOutputColumns.size()];
+ for (int i = 0; i < allOutputColumns.size(); i++) {
+ HiveColumnHandle outputColumn = allOutputColumns.get(i);
+ if (physicalSourceChannelMap[i] != -1) {
+ outputBlocks[i] = physicalSourcePage.getBlock(physicalSourceChannelMap[i]);
+ }
+ else {
+ // Column is synthesized
+ outputBlocks[i] = synthesizedColumnHandler.createRleSynthesizedBlock(outputColumn, positionCount);
+ }
+ }
+ return new Page(outputBlocks);
+ }
+
+ @Override
+ public long getMemoryUsage()
+ {
+ return dataPageSource.getMemoryUsage();
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ dataPageSource.close();
+ }
+}
diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiConfig.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiConfig.java
new file mode 100644
index 0000000000000..5138caea39c8d
--- /dev/null
+++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiConfig.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi;
+
+import com.google.common.collect.ImmutableList;
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+import io.airlift.configuration.DefunctConfig;
+import io.airlift.units.DataSize;
+import io.airlift.units.Duration;
+import jakarta.validation.constraints.DecimalMax;
+import jakarta.validation.constraints.DecimalMin;
+import jakarta.validation.constraints.Min;
+import jakarta.validation.constraints.NotNull;
+
+import java.util.List;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.airlift.units.DataSize.Unit.MEGABYTE;
+import static java.util.Locale.ENGLISH;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+@DefunctConfig({
+ "hudi.min-partition-batch-size",
+ "hudi.max-partition-batch-size"
+})
+public class HudiConfig
+{
+ private List columnsToHide = ImmutableList.of();
+ private boolean tableStatisticsEnabled = true;
+ private int tableStatisticsExecutorParallelism = 4;
+ private boolean metadataEnabled;
+ private boolean shouldUseParquetColumnNames = true;
+ private boolean shouldUseParquetColumnIndex;
+ private boolean sizeBasedSplitWeightsEnabled = true;
+ private DataSize standardSplitWeightSize = DataSize.of(128, MEGABYTE);
+ private double minimumAssignedSplitWeight = 0.05;
+ private DataSize targetSplitSize = DataSize.of(128, MEGABYTE);
+ private int maxSplitsPerSecond = Integer.MAX_VALUE;
+ private int maxOutstandingSplits = 1000;
+ private int splitLoaderParallelism = 4;
+ private int splitGeneratorParallelism = 4;
+ private long perTransactionMetastoreCacheMaximumSize = 2000;
+ private boolean queryPartitionFilterRequired;
+ private boolean ignoreAbsentPartitions;
+ private Duration dynamicFilteringWaitTimeout = new Duration(1, SECONDS);
+
+ // Internal configuration for debugging and testing
+ private boolean isRecordLevelIndexEnabled = true;
+ private boolean isSecondaryIndexEnabled = true;
+ private boolean isColumnStatsIndexEnabled = true;
+ private boolean isPartitionStatsIndexEnabled = true;
+ private Duration columnStatsWaitTimeout = new Duration(1, SECONDS);
+ private Duration recordIndexWaitTimeout = new Duration(2, SECONDS);
+ private Duration secondaryIndexWaitTimeout = new Duration(2, SECONDS);
+
+ public List getColumnsToHide()
+ {
+ return columnsToHide;
+ }
+
+ @Config("hudi.columns-to-hide")
+ @ConfigDescription("List of column names that will be hidden from the query output. " +
+ "It can be used to hide Hudi meta fields. By default, no fields are hidden.")
+ public HudiConfig setColumnsToHide(List columnsToHide)
+ {
+ this.columnsToHide = columnsToHide.stream()
+ .map(s -> s.toLowerCase(ENGLISH))
+ .collect(toImmutableList());
+ return this;
+ }
+
+ @Config("hudi.table-statistics-enabled")
+ @ConfigDescription("Enable table statistics for query planning.")
+ public HudiConfig setTableStatisticsEnabled(boolean tableStatisticsEnabled)
+ {
+ this.tableStatisticsEnabled = tableStatisticsEnabled;
+ return this;
+ }
+
+ public boolean isTableStatisticsEnabled()
+ {
+ return this.tableStatisticsEnabled;
+ }
+
+ @Min(1)
+ public int getTableStatisticsExecutorParallelism()
+ {
+ return tableStatisticsExecutorParallelism;
+ }
+
+ @Config("hudi.table-statistics-executor-parallelism")
+ @ConfigDescription("Number of threads to asynchronously generate table statistics.")
+ public HudiConfig setTableStatisticsExecutorParallelism(int parallelism)
+ {
+ this.tableStatisticsExecutorParallelism = parallelism;
+ return this;
+ }
+
+ @Config("hudi.metadata-enabled")
+ @ConfigDescription("Fetch the list of file names and sizes from Hudi metadata table rather than storage.")
+ public HudiConfig setMetadataEnabled(boolean metadataEnabled)
+ {
+ this.metadataEnabled = metadataEnabled;
+ return this;
+ }
+
+ public boolean isMetadataEnabled()
+ {
+ return this.metadataEnabled;
+ }
+
+ @Config("hudi.parquet.use-column-names")
+ @ConfigDescription("Access Parquet columns using names from the file. If disabled, then columns are accessed using index."
+ + "Only applicable to Parquet file format.")
+ public HudiConfig setUseParquetColumnNames(boolean shouldUseParquetColumnNames)
+ {
+ this.shouldUseParquetColumnNames = shouldUseParquetColumnNames;
+ return this;
+ }
+
+ public boolean getUseParquetColumnNames()
+ {
+ return this.shouldUseParquetColumnNames;
+ }
+
+ @Config("hudi.parquet.use-column-index")
+ @ConfigDescription("Enable using Parquet column indexes")
+ public HudiConfig setUseParquetColumnIndex(boolean shouldUseParquetColumnIndex)
+ {
+ this.shouldUseParquetColumnIndex = shouldUseParquetColumnIndex;
+ return this;
+ }
+
+ public boolean isUseParquetColumnIndex()
+ {
+ return this.shouldUseParquetColumnIndex;
+ }
+
+ @Config("hudi.size-based-split-weights-enabled")
+ @ConfigDescription("Unlike uniform splitting, size-based splitting ensures that each batch of splits has enough data to process. " +
+ "By default, it is enabled to improve performance.")
+ public HudiConfig setSizeBasedSplitWeightsEnabled(boolean sizeBasedSplitWeightsEnabled)
+ {
+ this.sizeBasedSplitWeightsEnabled = sizeBasedSplitWeightsEnabled;
+ return this;
+ }
+
+ public boolean isSizeBasedSplitWeightsEnabled()
+ {
+ return sizeBasedSplitWeightsEnabled;
+ }
+
+ @Config("hudi.standard-split-weight-size")
+ @ConfigDescription("The split size corresponding to the standard weight (1.0) "
+ + "when size based split weights are enabled.")
+ public HudiConfig setStandardSplitWeightSize(DataSize standardSplitWeightSize)
+ {
+ this.standardSplitWeightSize = standardSplitWeightSize;
+ return this;
+ }
+
+ @NotNull
+ public DataSize getStandardSplitWeightSize()
+ {
+ return standardSplitWeightSize;
+ }
+
+ @Config("hudi.minimum-assigned-split-weight")
+ @ConfigDescription("Minimum weight that a split can be assigned when size based split weights are enabled.")
+ public HudiConfig setMinimumAssignedSplitWeight(double minimumAssignedSplitWeight)
+ {
+ this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
+ return this;
+ }
+
+ @DecimalMax("1")
+ @DecimalMin(value = "0", inclusive = false)
+ public double getMinimumAssignedSplitWeight()
+ {
+ return minimumAssignedSplitWeight;
+ }
+
+ @Config("hudi.target-split-size")
+ @ConfigDescription("The target split size")
+ public HudiConfig setTargetSplitSize(DataSize targetSplitSize)
+ {
+ this.targetSplitSize = targetSplitSize;
+ return this;
+ }
+
+ @NotNull
+ public DataSize getTargetSplitSize()
+ {
+ return targetSplitSize;
+ }
+
+ @Min(1)
+ public int getMaxSplitsPerSecond()
+ {
+ return maxSplitsPerSecond;
+ }
+
+ @Config("hudi.max-splits-per-second")
+ @ConfigDescription("Rate at which splits are enqueued for processing. The queue will throttle if this rate limit is breached.")
+ public HudiConfig setMaxSplitsPerSecond(int maxSplitsPerSecond)
+ {
+ this.maxSplitsPerSecond = maxSplitsPerSecond;
+ return this;
+ }
+
+ @Min(1)
+ public int getMaxOutstandingSplits()
+ {
+ return maxOutstandingSplits;
+ }
+
+ @Config("hudi.max-outstanding-splits")
+ @ConfigDescription("Maximum outstanding splits in a batch enqueued for processing.")
+ public HudiConfig setMaxOutstandingSplits(int maxOutstandingSplits)
+ {
+ this.maxOutstandingSplits = maxOutstandingSplits;
+ return this;
+ }
+
+ @Min(1)
+ public int getSplitGeneratorParallelism()
+ {
+ return splitGeneratorParallelism;
+ }
+
+ @Config("hudi.split-generator-parallelism")
+ @ConfigDescription("Number of threads to generate splits from partitions.")
+ public HudiConfig setSplitGeneratorParallelism(int splitGeneratorParallelism)
+ {
+ this.splitGeneratorParallelism = splitGeneratorParallelism;
+ return this;
+ }
+
+ @Min(1)
+ public int getSplitLoaderParallelism()
+ {
+ return splitLoaderParallelism;
+ }
+
+ @Config("hudi.split-loader-parallelism")
+ @ConfigDescription("Number of threads to run background split loader. A single background split loader is needed per query.")
+ public HudiConfig setSplitLoaderParallelism(int splitLoaderParallelism)
+ {
+ this.splitLoaderParallelism = splitLoaderParallelism;
+ return this;
+ }
+
+ @Min(1)
+ public long getPerTransactionMetastoreCacheMaximumSize()
+ {
+ return perTransactionMetastoreCacheMaximumSize;
+ }
+
+ @Config("hudi.per-transaction-metastore-cache-maximum-size")
+ public HudiConfig setPerTransactionMetastoreCacheMaximumSize(long perTransactionMetastoreCacheMaximumSize)
+ {
+ this.perTransactionMetastoreCacheMaximumSize = perTransactionMetastoreCacheMaximumSize;
+ return this;
+ }
+
+ @Config("hudi.query-partition-filter-required")
+ @ConfigDescription("Require a filter on at least one partition column")
+ public HudiConfig setQueryPartitionFilterRequired(boolean queryPartitionFilterRequired)
+ {
+ this.queryPartitionFilterRequired = queryPartitionFilterRequired;
+ return this;
+ }
+
+ public boolean isQueryPartitionFilterRequired()
+ {
+ return queryPartitionFilterRequired;
+ }
+
+ @Config("hudi.ignore-absent-partitions")
+ public HudiConfig setIgnoreAbsentPartitions(boolean ignoreAbsentPartitions)
+ {
+ this.ignoreAbsentPartitions = ignoreAbsentPartitions;
+ return this;
+ }
+
+ public boolean isIgnoreAbsentPartitions()
+ {
+ return ignoreAbsentPartitions;
+ }
+
+ @Config("hudi.index.record-level-index-enabled")
+ @ConfigDescription("Internal configuration to control whether record level index is enabled for debugging/testing.")
+ public HudiConfig setRecordLevelIndexEnabled(boolean isRecordLevelIndexEnabled)
+ {
+ this.isRecordLevelIndexEnabled = isRecordLevelIndexEnabled;
+ return this;
+ }
+
+ public boolean isRecordLevelIndexEnabled()
+ {
+ return isRecordLevelIndexEnabled;
+ }
+
+ @Config("hudi.index.secondary-index-enabled")
+ @ConfigDescription("Internal configuration to control whether secondary index is enabled for debugging/testing.")
+ public HudiConfig setSecondaryIndexEnabled(boolean isSecondaryIndexEnabled)
+ {
+ this.isSecondaryIndexEnabled = isSecondaryIndexEnabled;
+ return this;
+ }
+
+ public boolean isSecondaryIndexEnabled()
+ {
+ return isSecondaryIndexEnabled;
+ }
+
+ @Config("hudi.index.column-stats-index-enabled")
+ @ConfigDescription("Internal configuration to control whether column stats index is enabled for debugging/testing.")
+ public HudiConfig setColumnStatsIndexEnabled(boolean isColumnStatsIndexEnabled)
+ {
+ this.isColumnStatsIndexEnabled = isColumnStatsIndexEnabled;
+ return this;
+ }
+
+ public boolean isColumnStatsIndexEnabled()
+ {
+ return isColumnStatsIndexEnabled;
+ }
+
+ @Config("hudi.index.partition-stats-index-enabled")
+ @ConfigDescription("Internal configuration to control whether partition stats index is enabled for debugging/testing.")
+ public HudiConfig setPartitionStatsIndexEnabled(boolean isPartitionStatsIndexEnabled)
+ {
+ this.isPartitionStatsIndexEnabled = isPartitionStatsIndexEnabled;
+ return this;
+ }
+
+ public boolean isPartitionStatsIndexEnabled()
+ {
+ return isPartitionStatsIndexEnabled;
+ }
+
+ @Config("hudi.dynamic-filtering.wait-timeout")
+ @ConfigDescription("Maximum timeout to wait for dynamic filtering, e.g. 1000ms, 20s, 2m, 1h")
+ public HudiConfig setDynamicFilteringWaitTimeout(Duration dynamicFilteringWaitTimeout)
+ {
+ this.dynamicFilteringWaitTimeout = dynamicFilteringWaitTimeout;
+ return this;
+ }
+
+ @NotNull
+ public Duration getDynamicFilteringWaitTimeout()
+ {
+ return dynamicFilteringWaitTimeout;
+ }
+
+ @Config("hudi.index.column-stats.wait-timeout")
+ @ConfigDescription("Maximum timeout to wait for loading column stats, e.g. 1000ms, 20s")
+ public HudiConfig setColumnStatsWaitTimeout(Duration columnStatusWaitTimeout)
+ {
+ this.columnStatsWaitTimeout = columnStatusWaitTimeout;
+ return this;
+ }
+
+ @NotNull
+ public Duration getColumnStatsWaitTimeout()
+ {
+ return columnStatsWaitTimeout;
+ }
+
+ @Config("hudi.index.record-index.wait-timeout")
+ @ConfigDescription("Maximum timeout to wait for loading record index, e.g. 1000ms, 20s")
+ public HudiConfig setRecordIndexWaitTimeout(Duration recordIndexWaitTimeout)
+ {
+ this.recordIndexWaitTimeout = recordIndexWaitTimeout;
+ return this;
+ }
+
+ @NotNull
+ public Duration getRecordIndexWaitTimeout()
+ {
+ return recordIndexWaitTimeout;
+ }
+
+ @Config("hudi.index.secondary-index.wait-timeout")
+ @ConfigDescription("Maximum timeout to wait for loading secondary index, e.g. 1000ms, 20s")
+ public HudiConfig setSecondaryIndexWaitTimeout(Duration secondaryIndexWaitTimeout)
+ {
+ this.secondaryIndexWaitTimeout = secondaryIndexWaitTimeout;
+ return this;
+ }
+
+ @NotNull
+ public Duration getSecondaryIndexWaitTimeout()
+ {
+ return secondaryIndexWaitTimeout;
+ }
+}
diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiConnector.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiConnector.java
new file mode 100644
index 0000000000000..072005cd3ffc4
--- /dev/null
+++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiConnector.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Injector;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata;
+import io.trino.plugin.base.session.SessionPropertiesProvider;
+import io.trino.plugin.hive.HiveTransactionHandle;
+import io.trino.spi.classloader.ThreadContextClassLoader;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorNodePartitioningProvider;
+import io.trino.spi.connector.ConnectorPageSourceProvider;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorSplitManager;
+import io.trino.spi.connector.ConnectorTransactionHandle;
+import io.trino.spi.connector.SystemTable;
+import io.trino.spi.session.PropertyMetadata;
+import io.trino.spi.transaction.IsolationLevel;
+
+import java.util.List;
+import java.util.Set;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.trino.spi.transaction.IsolationLevel.SERIALIZABLE;
+import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+public class HudiConnector
+ implements Connector
+{
+ private final Injector injector;
+ private final LifeCycleManager lifeCycleManager;
+ private final HudiTransactionManager transactionManager;
+ private final ConnectorSplitManager splitManager;
+ private final ConnectorPageSourceProvider pageSourceProvider;
+ private final ConnectorNodePartitioningProvider nodePartitioningProvider;
+ private final Set systemTables;
+ private final List> sessionProperties;
+ private final List> tableProperties;
+
+ public HudiConnector(
+ Injector injector,
+ LifeCycleManager lifeCycleManager,
+ HudiTransactionManager transactionManager,
+ ConnectorSplitManager splitManager,
+ ConnectorPageSourceProvider pageSourceProvider,
+ ConnectorNodePartitioningProvider nodePartitioningProvider,
+ Set systemTables,
+ Set sessionPropertiesProviders,
+ List> tableProperties)
+ {
+ this.injector = requireNonNull(injector, "injector is null");
+ this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+ this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
+ this.splitManager = requireNonNull(splitManager, "splitManager is null");
+ this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
+ this.nodePartitioningProvider = requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null");
+ this.systemTables = ImmutableSet.copyOf(requireNonNull(systemTables, "systemTables is null"));
+ this.sessionProperties = requireNonNull(sessionPropertiesProviders, "sessionPropertiesProviders is null").stream()
+ .flatMap(sessionPropertiesProvider -> sessionPropertiesProvider.getSessionProperties().stream())
+ .collect(toImmutableList());
+ this.tableProperties = ImmutableList.copyOf(requireNonNull(tableProperties, "tableProperties is null"));
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transactionHandle)
+ {
+ ConnectorMetadata metadata = transactionManager.get(transactionHandle, session.getIdentity());
+ return new ClassLoaderSafeConnectorMetadata(metadata, getClass().getClassLoader());
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager()
+ {
+ return splitManager;
+ }
+
+ @Override
+ public ConnectorPageSourceProvider getPageSourceProvider()
+ {
+ return pageSourceProvider;
+ }
+
+ @Override
+ public ConnectorNodePartitioningProvider getNodePartitioningProvider()
+ {
+ return nodePartitioningProvider;
+ }
+
+ @Override
+ public Set getSystemTables()
+ {
+ return systemTables;
+ }
+
+ @Override
+ public List> getSessionProperties()
+ {
+ return sessionProperties;
+ }
+
+ @Override
+ public List> getTableProperties()
+ {
+ return tableProperties;
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit)
+ {
+ checkConnectorSupports(SERIALIZABLE, isolationLevel);
+ ConnectorTransactionHandle transaction = new HiveTransactionHandle(true);
+ try (ThreadContextClassLoader _ = new ThreadContextClassLoader(getClass().getClassLoader())) {
+ transactionManager.put(transaction);
+ }
+ return transaction;
+ }
+
+ @Override
+ public void commit(ConnectorTransactionHandle transaction)
+ {
+ transactionManager.commit(transaction);
+ }
+
+ @Override
+ public void rollback(ConnectorTransactionHandle transaction)
+ {
+ transactionManager.rollback(transaction);
+ }
+
+ @Override
+ public final void shutdown()
+ {
+ lifeCycleManager.stop();
+ }
+
+ @VisibleForTesting
+ public Injector getInjector()
+ {
+ return injector;
+ }
+}
diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java
new file mode 100644
index 0000000000000..f7dd96d767991
--- /dev/null
+++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Module;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.json.JsonModule;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.trace.Tracer;
+import io.trino.filesystem.manager.FileSystemModule;
+import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider;
+import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager;
+import io.trino.plugin.base.classloader.ClassLoaderSafeNodePartitioningProvider;
+import io.trino.plugin.base.jmx.MBeanServerModule;
+import io.trino.plugin.base.session.SessionPropertiesProvider;
+import io.trino.plugin.hive.NodeVersion;
+import io.trino.plugin.hive.metastore.HiveMetastoreModule;
+import io.trino.spi.NodeManager;
+import io.trino.spi.catalog.CatalogName;
+import io.trino.spi.classloader.ThreadContextClassLoader;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorContext;
+import io.trino.spi.connector.ConnectorFactory;
+import io.trino.spi.connector.ConnectorNodePartitioningProvider;
+import io.trino.spi.connector.ConnectorPageSourceProvider;
+import io.trino.spi.connector.ConnectorSplitManager;
+import io.trino.spi.type.TypeManager;
+import org.weakref.jmx.guice.MBeanModule;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.google.inject.util.Modules.EMPTY_MODULE;
+import static io.trino.plugin.base.Versions.checkStrictSpiVersionMatch;
+
+public class HudiConnectorFactory
+ implements ConnectorFactory
+{
+ @Override
+ public String getName()
+ {
+ return "hudi";
+ }
+
+ @Override
+ public Connector create(String catalogName, Map config, ConnectorContext context)
+ {
+ checkStrictSpiVersionMatch(context, this);
+ return createConnector(catalogName, config, context, Optional.empty());
+ }
+
+ public static Connector createConnector(
+ String catalogName,
+ Map config,
+ ConnectorContext context,
+ Optional module)
+ {
+ ClassLoader classLoader = HudiConnectorFactory.class.getClassLoader();
+ try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) {
+ Bootstrap app = new Bootstrap(
+ new MBeanModule(),
+ new JsonModule(),
+ new HudiModule(),
+ new HiveMetastoreModule(Optional.empty()),
+ new FileSystemModule(catalogName, context.getNodeManager(), context.getOpenTelemetry(), false),
+ new MBeanServerModule(),
+ module.orElse(EMPTY_MODULE),
+ binder -> {
+ binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry());
+ binder.bind(Tracer.class).toInstance(context.getTracer());
+ binder.bind(NodeVersion.class).toInstance(new NodeVersion(context.getNodeManager().getCurrentNode().getVersion()));
+ binder.bind(NodeManager.class).toInstance(context.getNodeManager());
+ binder.bind(TypeManager.class).toInstance(context.getTypeManager());
+ binder.bind(CatalogName.class).toInstance(new CatalogName(catalogName));
+ });
+
+ Injector injector = app
+ .doNotInitializeLogging()
+ .setRequiredConfigurationProperties(config)
+ .initialize();
+
+ LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
+ HudiTransactionManager transactionManager = injector.getInstance(HudiTransactionManager.class);
+ ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
+ ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
+ ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
+ Set sessionPropertiesProviders = injector.getInstance(new Key<>() {});
+ HudiTableProperties hudiTableProperties = injector.getInstance(HudiTableProperties.class);
+
+ return new HudiConnector(
+ injector,
+ lifeCycleManager,
+ transactionManager,
+ new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader),
+ new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
+ new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
+ ImmutableSet.of(),
+ sessionPropertiesProviders,
+ hudiTableProperties.getTableProperties());
+ }
+ }
+}
diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java
new file mode 100644
index 0000000000000..6b34fedc8853c
--- /dev/null
+++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi;
+
+import io.trino.spi.ErrorCode;
+import io.trino.spi.ErrorCodeSupplier;
+import io.trino.spi.ErrorType;
+
+import static io.trino.spi.ErrorType.EXTERNAL;
+
+public enum HudiErrorCode
+ implements ErrorCodeSupplier
+{
+ // HUDI_UNKNOWN_TABLE_TYPE(0, EXTERNAL),
+ HUDI_INVALID_PARTITION_VALUE(1, EXTERNAL),
+ HUDI_BAD_DATA(2, EXTERNAL),
+ // HUDI_MISSING_DATA(3, EXTERNAL) is deprecated
+ HUDI_CANNOT_OPEN_SPLIT(4, EXTERNAL),
+ HUDI_UNSUPPORTED_FILE_FORMAT(5, EXTERNAL),
+ HUDI_CURSOR_ERROR(6, EXTERNAL),
+ HUDI_FILESYSTEM_ERROR(7, EXTERNAL),
+ HUDI_PARTITION_NOT_FOUND(8, EXTERNAL),
+ HUDI_UNSUPPORTED_TABLE_TYPE(9, EXTERNAL),
+ HUDI_NO_VALID_COMMIT(10, EXTERNAL),
+ HUDI_META_CLIENT_ERROR(11, EXTERNAL),
+ /**/;
+
+ private final ErrorCode errorCode;
+
+ HudiErrorCode(int code, ErrorType type)
+ {
+ errorCode = new ErrorCode(code + 0x0507_0000, name(), type);
+ }
+
+ @Override
+ public ErrorCode toErrorCode()
+ {
+ return errorCode;
+ }
+}
diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiFileStatus.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiFileStatus.java
new file mode 100644
index 0000000000000..56d585db87721
--- /dev/null
+++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiFileStatus.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi;
+
+import io.trino.filesystem.Location;
+
+import static java.util.Objects.requireNonNull;
+
+public record HudiFileStatus(Location location, boolean isDirectory, long length, long modificationTime, long blockSize)
+{
+ public HudiFileStatus
+ {
+ requireNonNull(location, "location is null");
+ }
+}
diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
new file mode 100644
index 0000000000000..bbe5564b1d154
--- /dev/null
+++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
@@ -0,0 +1,498 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import io.airlift.log.Logger;
+import io.trino.filesystem.TrinoFileSystem;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.metastore.Column;
+import io.trino.metastore.HiveMetastore;
+import io.trino.metastore.Table;
+import io.trino.metastore.TableInfo;
+import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable;
+import io.trino.plugin.hive.HiveColumnHandle;
+import io.trino.plugin.hudi.stats.HudiTableStatistics;
+import io.trino.plugin.hudi.stats.TableStatisticsReader;
+import io.trino.plugin.hudi.util.HudiTableTypeUtils;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ColumnHandle;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableHandle;
+import io.trino.spi.connector.ConnectorTableMetadata;
+import io.trino.spi.connector.ConnectorTableVersion;
+import io.trino.spi.connector.Constraint;
+import io.trino.spi.connector.ConstraintApplicationResult;
+import io.trino.spi.connector.RelationColumnsMetadata;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.connector.SchemaTablePrefix;
+import io.trino.spi.connector.SystemTable;
+import io.trino.spi.connector.TableNotFoundException;
+import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.statistics.Estimate;
+import io.trino.spi.statistics.TableStatistics;
+import io.trino.spi.type.TypeManager;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.util.Lazy;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
+import java.util.stream.Stream;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
+import static io.trino.metastore.Table.TABLE_COMMENT;
+import static io.trino.plugin.hive.HiveTimestampPrecision.NANOSECONDS;
+import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter;
+import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles;
+import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles;
+import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
+import static io.trino.plugin.hive.util.HiveUtil.isHudiTable;
+import static io.trino.plugin.hudi.HudiSessionProperties.getColumnsToHide;
+import static io.trino.plugin.hudi.HudiSessionProperties.isHudiMetadataTableEnabled;
+import static io.trino.plugin.hudi.HudiSessionProperties.isQueryPartitionFilterRequired;
+import static io.trino.plugin.hudi.HudiSessionProperties.isTableStatisticsEnabled;
+import static io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY;
+import static io.trino.plugin.hudi.HudiTableProperties.PARTITIONED_BY_PROPERTY;
+import static io.trino.plugin.hudi.HudiUtil.buildTableMetaClient;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.trino.spi.StandardErrorCode.QUERY_REJECTED;
+import static io.trino.spi.StandardErrorCode.UNSUPPORTED_TABLE_TYPE;
+import static io.trino.spi.connector.SchemaTableName.schemaTableName;
+import static java.lang.String.format;
+import static java.util.Collections.singletonList;
+import static java.util.Objects.requireNonNull;
+import static java.util.function.Function.identity;
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLUSTERING_ACTION;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.INDEXING_ACTION;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+public class HudiMetadata
+ implements ConnectorMetadata
+{
+ private static final Logger log = Logger.get(HudiMetadata.class);
+ private static final Map tableStatisticsCache = new ConcurrentHashMap<>();
+ private static final Set refreshingKeysInProgress = ConcurrentHashMap.newKeySet();
+ private final HiveMetastore metastore;
+ private final TrinoFileSystemFactory fileSystemFactory;
+ private final TypeManager typeManager;
+ private final ExecutorService tableStatisticsExecutor;
+
+ public HudiMetadata(
+ HiveMetastore metastore,
+ TrinoFileSystemFactory fileSystemFactory,
+ TypeManager typeManager,
+ ExecutorService tableStatisticsExecutor)
+ {
+ this.metastore = requireNonNull(metastore, "metastore is null");
+ this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ this.tableStatisticsExecutor = requireNonNull(tableStatisticsExecutor, "tableStatisticsExecutor is null");
+ }
+
+ @Override
+ public List listSchemaNames(ConnectorSession session)
+ {
+ return metastore.getAllDatabases().stream()
+ .filter(schemaName -> !isHiveSystemSchema(schemaName))
+ .collect(toImmutableList());
+ }
+
+ @Override
+ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName, Optional startVersion, Optional endVersion)
+ {
+ log.info("Creating new HudiTableHandle for %s", tableName);
+ if (startVersion.isPresent() || endVersion.isPresent()) {
+ throw new TrinoException(NOT_SUPPORTED, "This connector does not support versioned tables");
+ }
+
+ if (isHiveSystemSchema(tableName.getSchemaName())) {
+ return null;
+ }
+ Optional tableOpt = metastore.getTable(tableName.getSchemaName(), tableName.getTableName());
+ if (tableOpt.isEmpty()) {
+ return null;
+ }
+
+ Table table = tableOpt.get();
+ if (!isHudiTable(table)) {
+ throw new TrinoException(UNSUPPORTED_TABLE_TYPE, format("Not a Hudi table: %s", tableName));
+ }
+ String basePath = table.getStorage().getLocation();
+ TrinoFileSystem fileSystem = fileSystemFactory.create(session);
+ String inputFormat = table.getStorage().getStorageFormat().getInputFormat();
+ HoodieTableType hoodieTableType = HudiTableTypeUtils.fromInputFormat(inputFormat);
+
+ return new HudiTableHandle(
+ Optional.of(table),
+ Optional.of(Lazy.lazily(() -> buildTableMetaClient(fileSystem, tableName.toString(), basePath))),
+ tableName.getSchemaName(),
+ tableName.getTableName(),
+ table.getStorage().getLocation(),
+ hoodieTableType,
+ getPartitionKeyColumnHandles(table, typeManager),
+ ImmutableSet.of(),
+ TupleDomain.all(),
+ TupleDomain.all());
+ }
+
+ @Override
+ public Optional getSystemTable(ConnectorSession session, SchemaTableName tableName)
+ {
+ return getRawSystemTable(tableName, session)
+ .map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader()));
+ }
+
+ private Optional getRawSystemTable(SchemaTableName tableName, ConnectorSession session)
+ {
+ Optional nameOptional = HudiTableName.from(tableName.getTableName());
+ if (nameOptional.isEmpty()) {
+ return Optional.empty();
+ }
+ HudiTableName name = nameOptional.get();
+ if (name.tableType() == TableType.DATA) {
+ return Optional.empty();
+ }
+
+ Optional tableOptional = metastore.getTable(tableName.getSchemaName(), name.tableName());
+ if (tableOptional.isEmpty()) {
+ return Optional.empty();
+ }
+ if (!isHudiTable(tableOptional.get())) {
+ return Optional.empty();
+ }
+ return switch (name.tableType()) {
+ case DATA -> throw new AssertionError();
+ case TIMELINE -> {
+ SchemaTableName systemTableName = new SchemaTableName(tableName.getSchemaName(), name.tableNameWithType());
+ yield Optional.of(new TimelineTable(fileSystemFactory.create(session), systemTableName, tableOptional.get()));
+ }
+ };
+ }
+
+ @Override
+ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
+ {
+ HudiTableHandle hudiTableHandle = (HudiTableHandle) table;
+ return getTableMetadata(hudiTableHandle.getSchemaTableName(), getColumnsToHide(session));
+ }
+
+ @Override
+ public Optional> applyFilter(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint constraint)
+ {
+ HudiTableHandle handle = (HudiTableHandle) tableHandle;
+ HudiPredicates predicates = HudiPredicates.from(constraint.getSummary());
+ TupleDomain regularColumnPredicates = predicates.getRegularColumnPredicates();
+ TupleDomain partitionColumnPredicates = predicates.getPartitionColumnPredicates();
+
+ // TODO Since the constraint#predicate isn't utilized during split generation. So,
+ // Let's not add constraint#predicateColumns to newConstraintColumns.
+ Set newConstraintColumns = Stream.concat(
+ Stream.concat(
+ regularColumnPredicates.getDomains().stream()
+ .map(Map::keySet)
+ .flatMap(Collection::stream),
+ partitionColumnPredicates.getDomains().stream()
+ .map(Map::keySet)
+ .flatMap(Collection::stream)),
+ handle.getConstraintColumns().stream())
+ .collect(toImmutableSet());
+
+ HudiTableHandle newHudiTableHandle = handle.applyPredicates(
+ newConstraintColumns,
+ partitionColumnPredicates,
+ regularColumnPredicates);
+
+ if (handle.getPartitionPredicates().equals(newHudiTableHandle.getPartitionPredicates())
+ && handle.getRegularPredicates().equals(newHudiTableHandle.getRegularPredicates())
+ && handle.getConstraintColumns().equals(newHudiTableHandle.getConstraintColumns())) {
+ return Optional.empty();
+ }
+
+ return Optional.of(new ConstraintApplicationResult<>(
+ newHudiTableHandle,
+ newHudiTableHandle.getRegularPredicates().transformKeys(ColumnHandle.class::cast),
+ constraint.getExpression(),
+ false));
+ }
+
+ @Override
+ public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ HudiTableHandle hudiTableHandle = (HudiTableHandle) tableHandle;
+ Table table = metastore.getTable(hudiTableHandle.getSchemaName(), hudiTableHandle.getTableName())
+ .orElseThrow(() -> new TableNotFoundException(schemaTableName(hudiTableHandle.getSchemaName(), hudiTableHandle.getTableName())));
+ return hiveColumnHandles(table, typeManager, NANOSECONDS).stream()
+ .collect(toImmutableMap(HiveColumnHandle::getName, identity()));
+ }
+
+ @Override
+ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
+ {
+ return ((HiveColumnHandle) columnHandle).getColumnMetadata();
+ }
+
+ @Override
+ public Optional