diff --git a/pom.xml b/pom.xml
index a5d1cd4232efe..0905fa835c5f7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,6 +71,7 @@
0.19.0
2.3.1
0.13.1
+ 0.5.0-incubating
1.18.3
3.3.0
2.6.0
@@ -191,6 +192,7 @@
redis-hbo-provider
presto-singlestore
presto-hana
+ presto-paimon
@@ -1333,6 +1335,24 @@
+
+
+ org.apache.paimon
+ paimon-bundle
+ ${dep.paimon.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+ com.google.guava
+ guava
+
+
+
+
+
net.sf.opencsv
diff --git a/presto-paimon/pom.xml b/presto-paimon/pom.xml
new file mode 100644
index 0000000000000..991667cf69839
--- /dev/null
+++ b/presto-paimon/pom.xml
@@ -0,0 +1,231 @@
+
+
+ 4.0.0
+
+ presto-root
+ com.facebook.presto
+ 0.285-SNAPSHOT
+
+ presto-paimon
+ Presto - Paimon Connector
+ presto-plugin
+
+
+ ${project.parent.basedir}
+
+
+
+
+ com.google.guava
+ guava
+
+
+ org.codehaus.mojo
+ animal-sniffer-annotations
+
+
+ compile
+
+
+
+ com.google.inject
+ guice
+
+
+
+ io.airlift
+ aircompressor
+
+
+
+ javax.inject
+ javax.inject
+
+
+
+ org.weakref
+ jmxutils
+
+
+
+ com.facebook.airlift
+ bootstrap
+
+
+
+ com.facebook.airlift
+ concurrent
+
+
+
+ com.facebook.airlift
+ configuration
+
+
+
+ com.facebook.airlift
+ event
+
+
+
+ com.facebook.airlift
+ json
+
+
+
+ com.facebook.airlift
+ log
+
+
+
+ com.facebook.presto
+ presto-cache
+
+
+
+ com.facebook.presto
+ presto-memory-context
+
+
+
+ com.facebook.presto
+ presto-plugin-toolkit
+
+
+
+ com.facebook.presto
+ presto-parquet
+
+
+
+ com.facebook.presto
+ presto-hive-common
+
+
+
+ com.facebook.presto
+ presto-hive-metastore
+
+
+
+ com.facebook.presto
+ presto-hive
+
+
+
+ com.facebook.presto.hadoop
+ hadoop-apache2
+
+
+
+ com.facebook.presto.hive
+ hive-apache
+
+
+
+ javax.validation
+ validation-api
+
+
+
+ org.apache.paimon
+ paimon-bundle
+
+
+
+
+ org.jetbrains
+ annotations
+ test
+
+
+
+ org.testng
+ testng
+ test
+
+
+
+ com.facebook.presto
+ presto-tests
+ test
+
+
+
+ com.facebook.presto
+ presto-tpch
+ test
+
+
+
+ com.facebook.presto
+ presto-main
+ test
+
+
+
+ com.facebook.presto
+ presto-hive
+ test-jar
+ test
+
+
+
+ com.facebook.presto
+ presto-hive-metastore
+ test-jar
+ test
+
+
+
+
+ com.facebook.airlift
+ log-manager
+ runtime
+
+
+
+
+ com.facebook.presto
+ presto-spi
+ provided
+
+
+
+ com.facebook.presto
+ presto-common
+ provided
+
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ com.facebook.drift
+ drift-api
+ provided
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ 2.10.2
+ provided
+
+
+
+ org.openjdk.jol
+ jol-core
+ provided
+
+
+
+ io.airlift
+ units
+ provided
+
+
+
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/ClassLoaderUtils.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/ClassLoaderUtils.java
new file mode 100644
index 0000000000000..2c5a381d1b45c
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/ClassLoaderUtils.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import java.util.function.Supplier;
+
+/** Utils for {@link ClassLoader}. */
+public class ClassLoaderUtils {
+
+ public static T runWithContextClassLoader(Supplier supplier, ClassLoader classLoader) {
+ ClassLoader previous = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(classLoader);
+ try {
+ return supplier.get();
+ } finally {
+ Thread.currentThread().setContextClassLoader(previous);
+ }
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/EncodingUtils.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/EncodingUtils.java
new file mode 100644
index 0000000000000..eee3d566ebee6
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/EncodingUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.paimon.utils.InstantiationUtil;
+
+import java.util.Base64;
+
+/** Utils for encoding. */
+public class EncodingUtils {
+
+ private static final Base64.Encoder BASE64_ENCODER = Base64.getUrlEncoder().withoutPadding();
+
+ private static final Base64.Decoder BASE64_DECODER = Base64.getUrlDecoder();
+
+ public static String encodeObjectToString(T t) {
+ try {
+ byte[] bytes = InstantiationUtil.serializeObject(t);
+ return new String(BASE64_ENCODER.encode(bytes), UTF_8);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static T decodeStringToObject(String encodedStr) {
+ final byte[] bytes = BASE64_DECODER.decode(encodedStr.getBytes(UTF_8));
+ try {
+ return InstantiationUtil.deserializeObject(bytes, EncodingUtils.class.getClassLoader());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/FieldNameUtils.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/FieldNameUtils.java
new file mode 100644
index 0000000000000..ff6e8a4e20312
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/FieldNameUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Utils for fieldName. */
+public class FieldNameUtils {
+
+ public static List fieldNames(RowType rowType) {
+ return rowType.getFields().stream()
+ .map(DataField::name)
+ .map(String::toLowerCase)
+ .collect(Collectors.toList());
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonColumnHandle.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonColumnHandle.java
new file mode 100644
index 0000000000000..ef74cdefd3ad1
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonColumnHandle.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import static java.util.Objects.requireNonNull;
+
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.common.type.TypeManager;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+/** Presto {@link ColumnHandle}. */
+public class PaimonColumnHandle implements ColumnHandle {
+
+ private final String columnName;
+ private final String typeString;
+ private final Type prestoType;
+
+ @JsonCreator
+ public PaimonColumnHandle(
+ @JsonProperty("columnName") String columnName,
+ @JsonProperty("typeString") String typeString,
+ @JsonProperty("prestoType") Type prestoType) {
+ this.columnName = requireNonNull(columnName, "columnName is null");
+ this.typeString = requireNonNull(typeString, "typeString is null");
+ this.prestoType = requireNonNull(prestoType, "columnType is null");
+ }
+
+ public static PaimonColumnHandle create(
+ String columnName, DataType columnType, TypeManager typeManager) {
+ return new PaimonColumnHandle(
+ columnName,
+ JsonSerdeUtil.toJson(columnType),
+ PaimonTypeUtils.toPrestoType(columnType, typeManager));
+ }
+
+ @JsonProperty
+ public String getColumnName() {
+ return columnName;
+ }
+
+ @JsonProperty
+ public String getTypeString() {
+ return typeString;
+ }
+
+ public DataType paimonType() {
+ return JsonSerdeUtil.fromJson(typeString, DataType.class);
+ }
+
+ @JsonProperty
+ public Type getPrestoType() {
+ return prestoType;
+ }
+
+ public ColumnMetadata getColumnMetadata() {
+ return new ColumnMetadata(columnName, prestoType);
+ }
+
+ @Override
+ public int hashCode() {
+ return columnName.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+
+ PaimonColumnHandle other = (PaimonColumnHandle) obj;
+ return columnName.equals(other.columnName);
+ }
+
+ @Override
+ public String toString() {
+ return "{"
+ + "columnName='"
+ + columnName
+ + '\''
+ + ", typeString='"
+ + typeString
+ + '\''
+ + ", prestoType="
+ + prestoType
+ + '}';
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConfig.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConfig.java
new file mode 100644
index 0000000000000..2566b494b3e72
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConfig.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import com.facebook.airlift.configuration.Config;
+
+/** Used for configuration item inspection and management. */
+public class PaimonConfig {
+
+ private String warehouse;
+ private String metastore;
+ private String uri;
+
+ public String getWarehouse() {
+ return warehouse;
+ }
+
+ @Config("warehouse")
+ public PaimonConfig setWarehouse(String warehouse) {
+ this.warehouse = warehouse;
+ return this;
+ }
+
+ public String getMetastore() {
+ return metastore;
+ }
+
+ @Config("metastore")
+ public PaimonConfig setMetastore(String metastore) {
+ this.metastore = metastore;
+ return this;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ @Config("uri")
+ public PaimonConfig setUri(String uri) {
+ this.uri = uri;
+ return this;
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnector.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnector.java
new file mode 100644
index 0000000000000..69aa742eb1b30
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnector.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorCommitHandle;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.connector.EmptyConnectorCommitHandle;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
+import com.facebook.presto.spi.transaction.IsolationLevel;
+
+import javax.inject.Inject;
+
+/** Presto {@link Connector}. */
+public class PaimonConnector implements Connector {
+
+ private final PaimonTransactionManager transactionManager;
+ private final PaimonSplitManager paimonSplitManager;
+ private final PaimonPageSourceProvider paimonPageSourceProvider;
+ private final PaimonMetadata paimonMetadata;
+
+ @Inject
+ public PaimonConnector(
+ PaimonTransactionManager transactionManager,
+ PaimonSplitManager paimonSplitManager,
+ PaimonPageSourceProvider paimonPageSourceProvider,
+ PaimonMetadata paimonMetadata) {
+ this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
+ this.paimonSplitManager = requireNonNull(paimonSplitManager, "paimonSplitManager is null");
+ this.paimonPageSourceProvider =
+ requireNonNull(paimonPageSourceProvider, "paimonPageSourceProvider is null");
+ this.paimonMetadata = requireNonNull(paimonMetadata, "paimonMetadata is null");
+ }
+
+ @Override
+ public ConnectorCommitHandle commit(ConnectorTransactionHandle transaction) {
+ transactionManager.remove(transaction);
+ return EmptyConnectorCommitHandle.INSTANCE;
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(
+ IsolationLevel isolationLevel, boolean readOnly) {
+ checkConnectorSupports(READ_COMMITTED, isolationLevel);
+ ConnectorTransactionHandle transaction = new PaimonTransactionHandle();
+ try (ThreadContextClassLoader ignored =
+ new ThreadContextClassLoader(getClass().getClassLoader())) {
+ transactionManager.put(transaction, paimonMetadata);
+ }
+ return transaction;
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) {
+ ConnectorMetadata metadata = transactionManager.get(transactionHandle);
+ return new ClassLoaderSafeConnectorMetadata(metadata, getClass().getClassLoader());
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager() {
+ return paimonSplitManager;
+ }
+
+ @Override
+ public ConnectorPageSourceProvider getPageSourceProvider() {
+ return paimonPageSourceProvider;
+ }
+
+ @Override
+ public void rollback(ConnectorTransactionHandle transaction) {
+ transactionManager.remove(transaction);
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnectorFactory.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnectorFactory.java
new file mode 100644
index 0000000000000..996d3c58c0554
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnectorFactory.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static java.util.Objects.requireNonNull;
+
+import com.facebook.airlift.bootstrap.Bootstrap;
+import com.facebook.airlift.json.JsonModule;
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorContext;
+import com.facebook.presto.spi.connector.ConnectorFactory;
+import com.google.inject.Injector;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+
+/** Presto {@link ConnectorFactory}. */
+public class PaimonConnectorFactory implements ConnectorFactory {
+
+ @Override
+ public String getName() {
+ return "paimon";
+ }
+
+ @Override
+ public ConnectorHandleResolver getHandleResolver() {
+ return new PaimonHandleResolver();
+ }
+
+ @Override
+ public Connector create(
+ String catalogName, Map config, ConnectorContext context) {
+ requireNonNull(config, "config is null");
+
+ try {
+ Bootstrap app =
+ new Bootstrap(
+ new JsonModule(),
+ new PaimonModule(
+ catalogName,
+ context.getTypeManager(),
+ context.getFunctionMetadataManager(),
+ context.getStandardFunctionResolution(),
+ context.getRowExpressionService(),
+ config));
+
+ Bootstrap bootstrap =
+ app.doNotInitializeLogging().setRequiredConfigurationProperties(config).quiet();
+ try {
+ // Using reflection to achieve compatibility with different versions of
+ // dependencies.
+ Method noStrictConfigMethod = Bootstrap.class.getMethod("noStrictConfig");
+ noStrictConfigMethod.invoke(bootstrap);
+ } catch (NoSuchMethodException e) {
+ // ignore
+ }
+ Injector injector = bootstrap.initialize();
+
+ return injector.getInstance(PaimonConnector.class);
+ } catch (Exception e) {
+ throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnectorId.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnectorId.java
new file mode 100644
index 0000000000000..8cc96200ead29
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnectorId.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Objects;
+
+/** Wrap for connector id. */
+public final class PaimonConnectorId {
+
+ private final String id;
+
+ public PaimonConnectorId(String id) {
+ this.id = requireNonNull(id, "id is null");
+ }
+
+ @Override
+ public String toString() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+ PaimonConnectorId other = (PaimonConnectorId) obj;
+ return Objects.equals(this.id, other.id);
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonFilterConverter.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonFilterConverter.java
new file mode 100644
index 0000000000000..1ae4867f1f9ab
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonFilterConverter.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import com.facebook.presto.common.predicate.Domain;
+import com.facebook.presto.common.predicate.Range;
+import com.facebook.presto.common.predicate.TupleDomain;
+import com.facebook.presto.common.type.ArrayType;
+import com.facebook.presto.common.type.BigintType;
+import com.facebook.presto.common.type.BooleanType;
+import com.facebook.presto.common.type.CharType;
+import com.facebook.presto.common.type.DateType;
+import com.facebook.presto.common.type.DecimalType;
+import com.facebook.presto.common.type.Decimals;
+import com.facebook.presto.common.type.DoubleType;
+import com.facebook.presto.common.type.IntegerType;
+import com.facebook.presto.common.type.MapType;
+import com.facebook.presto.common.type.RealType;
+import com.facebook.presto.common.type.TimeType;
+import com.facebook.presto.common.type.TimestampType;
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.common.type.VarbinaryType;
+import com.facebook.presto.common.type.VarcharType;
+import io.airlift.slice.Slice;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.shade.guava30.com.google.common.base.Preconditions;
+import org.apache.paimon.types.RowType;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/** Presto filter to Paimon predicate. */
+public class PaimonFilterConverter {
+
+ private final RowType rowType;
+ private final PredicateBuilder builder;
+
+ public PaimonFilterConverter(RowType rowType) {
+ this.rowType = rowType;
+ this.builder = new PredicateBuilder(rowType);
+ }
+
+ public Optional convert(TupleDomain tupleDomain) {
+ if (tupleDomain.isAll()) {
+ return Optional.empty();
+ }
+
+ if (!tupleDomain.getDomains().isPresent()) {
+ return Optional.empty();
+ }
+
+ Map domainMap = tupleDomain.getDomains().get();
+ List conjuncts = new ArrayList<>();
+ for (Map.Entry entry : domainMap.entrySet()) {
+ PaimonColumnHandle columnHandle = entry.getKey();
+ Domain domain = entry.getValue();
+ int index = rowType.getFieldNames().indexOf(columnHandle.getColumnName());
+ if (index != -1) {
+ try {
+ conjuncts.add(toPredicate(index, columnHandle.getPrestoType(), domain));
+ } catch (UnsupportedOperationException ignored) {
+ }
+ }
+ }
+
+ if (conjuncts.isEmpty()) {
+ return Optional.empty();
+ }
+ return Optional.of(PredicateBuilder.and(conjuncts));
+ }
+
+ private Predicate toPredicate(int columnIndex, Type type, Domain domain) {
+ if (domain.isAll()) {
+ // TODO alwaysTrue
+ throw new UnsupportedOperationException();
+ }
+ if (domain.getValues().isNone()) {
+ if (domain.isNullAllowed()) {
+ return builder.isNull((columnIndex));
+ }
+ // TODO alwaysFalse
+ throw new UnsupportedOperationException();
+ }
+
+ if (domain.getValues().isAll()) {
+ if (domain.isNullAllowed()) {
+ // TODO alwaysTrue
+ throw new UnsupportedOperationException();
+ }
+ return builder.isNotNull((columnIndex));
+ }
+
+ // TODO support structural types
+ if (type instanceof ArrayType
+ || type instanceof MapType
+ || type instanceof com.facebook.presto.common.type.RowType) {
+ // Fail fast. Ignoring expression could lead to data loss in case of deletions.
+ throw new UnsupportedOperationException();
+ }
+
+ if (type.isOrderable()) {
+ List orderedRanges = domain.getValues().getRanges().getOrderedRanges();
+ List values = new ArrayList<>();
+ List predicates = new ArrayList<>();
+ for (Range range : orderedRanges) {
+ if (range.isSingleValue()) {
+ values.add(getLiteralValue(type, range.getLowBoundedValue()));
+ } else {
+ predicates.add(toPredicate(columnIndex, range));
+ }
+ }
+
+ if (!values.isEmpty()) {
+ predicates.add(builder.in(columnIndex, values));
+ }
+
+ if (domain.isNullAllowed()) {
+ predicates.add(builder.isNull(columnIndex));
+ }
+ return PredicateBuilder.or(predicates);
+ }
+
+ throw new UnsupportedOperationException();
+ }
+
+ private Predicate toPredicate(int columnIndex, Range range) {
+ Type type = range.getType();
+
+ if (range.isSingleValue()) {
+ Object value = getLiteralValue(type, range.getSingleValue());
+ return builder.equal(columnIndex, value);
+ }
+
+ List conjuncts = new ArrayList<>(2);
+ if (!range.isLowUnbounded()) {
+ Object low = getLiteralValue(type, range.getLowBoundedValue());
+ Predicate lowBound;
+ if (range.isLowInclusive()) {
+ lowBound = builder.greaterOrEqual(columnIndex, low);
+ } else {
+ lowBound = builder.greaterThan(columnIndex, low);
+ }
+ conjuncts.add(lowBound);
+ }
+
+ if (!range.isHighUnbounded()) {
+ Object high = getLiteralValue(type, range.getHighBoundedValue());
+ Predicate highBound;
+ if (range.isHighInclusive()) {
+ highBound = builder.lessOrEqual(columnIndex, high);
+ } else {
+ highBound = builder.lessThan(columnIndex, high);
+ }
+ conjuncts.add(highBound);
+ }
+
+ return PredicateBuilder.and(conjuncts);
+ }
+
+ private Object getLiteralValue(Type type, Object prestoNativeValue) {
+ Objects.requireNonNull(prestoNativeValue, "prestoNativeValue is null");
+
+ if (type instanceof BooleanType) {
+ return prestoNativeValue;
+ }
+
+ if (type instanceof IntegerType) {
+ return Math.toIntExact((long) prestoNativeValue);
+ }
+
+ if (type instanceof BigintType) {
+ return prestoNativeValue;
+ }
+
+ if (type instanceof RealType) {
+ return Float.intBitsToFloat(Math.toIntExact((long) prestoNativeValue));
+ }
+
+ if (type instanceof DoubleType) {
+ return prestoNativeValue;
+ }
+
+ if (type instanceof DateType) {
+ return Math.toIntExact(((Long) prestoNativeValue));
+ }
+
+ if (type instanceof TimestampType || type instanceof TimeType) {
+ return TimeUnit.MILLISECONDS.toMicros((Long) prestoNativeValue);
+ }
+
+ if (type instanceof VarcharType || type instanceof CharType) {
+ return BinaryString.fromBytes(((Slice) prestoNativeValue).getBytes());
+ }
+
+ if (type instanceof VarbinaryType) {
+ return ((Slice) prestoNativeValue).getBytes();
+ }
+
+ if (type instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType) type;
+ Object value =
+ Objects.requireNonNull(
+ prestoNativeValue, "The prestoNativeValue must be non-null");
+ if (Decimals.isShortDecimal(decimalType)) {
+ Preconditions.checkArgument(
+ value instanceof Long,
+ "A short decimal should be represented by a Long value but was %s",
+ value.getClass().getName());
+ return BigDecimal.valueOf((long) value).movePointLeft(decimalType.getScale());
+ }
+ Preconditions.checkArgument(
+ value instanceof Slice,
+ "A long decimal should be represented by a Slice value but was %s",
+ value.getClass().getName());
+ return new BigDecimal(
+ Decimals.decodeUnscaledValue((Slice) value), decimalType.getScale());
+ }
+
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonHandleResolver.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonHandleResolver.java
new file mode 100644
index 0000000000000..538ea5cd87919
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonHandleResolver.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+/** Presto {@link ConnectorHandleResolver}. */
+public class PaimonHandleResolver implements ConnectorHandleResolver {
+
+ @Override
+ public Class extends ConnectorTableHandle> getTableHandleClass() {
+ return PaimonTableHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorTableLayoutHandle> getTableLayoutHandleClass() {
+ return PaimonTableLayoutHandle.class;
+ }
+
+ @Override
+ public Class extends ColumnHandle> getColumnHandleClass() {
+ return PaimonColumnHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorSplit> getSplitClass() {
+ return PaimonSplit.class;
+ }
+
+ @Override
+ public Class extends ConnectorTransactionHandle> getTransactionHandleClass() {
+ return PaimonTransactionHandle.class;
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonMetadata.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonMetadata.java
new file mode 100644
index 0000000000000..09d6e712d6fc9
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonMetadata.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+import com.facebook.presto.common.type.TypeManager;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.ConnectorNewTableLayout;
+import com.facebook.presto.spi.ConnectorOutputTableHandle;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayout;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutResult;
+import com.facebook.presto.spi.ConnectorTableMetadata;
+import com.facebook.presto.spi.Constraint;
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.SchemaTablePrefix;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.connector.ConnectorOutputMetadata;
+import com.facebook.presto.spi.statistics.ComputedStatistics;
+import io.airlift.slice.Slice;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.security.SecurityContext;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+import org.apache.paimon.utils.InstantiationUtil;
+import org.apache.paimon.utils.StringUtils;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import javax.inject.Inject;
+
+/** Presto {@link ConnectorMetadata}. */
+public class PaimonMetadata implements ConnectorMetadata {
+
+ private final Catalog catalog;
+ private final TypeManager typeManager;
+
+ @Inject
+ public PaimonMetadata(Options catalogOptions, TypeManager typeManager) {
+ try {
+ SecurityContext.install(catalogOptions);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ this.catalog = CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
+ this.typeManager = typeManager;
+ }
+
+ @Override
+ public List listSchemaNames(ConnectorSession session) {
+ return listSchemaNames();
+ }
+
+ private List listSchemaNames() {
+ return catalog.listDatabases();
+ }
+
+ @Override
+ public boolean schemaExists(ConnectorSession session, String schemaName) {
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(schemaName),
+ "schemaName cannot be null or empty");
+ return catalog.databaseExists(schemaName);
+ }
+
+ @Override
+ public void createSchema(
+ ConnectorSession session, String schemaName, Map properties) {
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(schemaName),
+ "schemaName cannot be null or empty");
+ try {
+ catalog.createDatabase(schemaName, true);
+ } catch (Catalog.DatabaseAlreadyExistException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void dropSchema(ConnectorSession session, String schemaName) {
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(schemaName),
+ "schemaName cannot be null or empty");
+ try {
+ catalog.dropDatabase(schemaName, true, true);
+ } catch (Catalog.DatabaseNotExistException | Catalog.DatabaseNotEmptyException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public PaimonTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
+ return getTableHandle(tableName);
+ }
+
+ public PaimonTableHandle getTableHandle(SchemaTableName tableName) {
+ Identifier tablePath = new Identifier(tableName.getSchemaName(), tableName.getTableName());
+ byte[] serializedTable;
+ try {
+ serializedTable = InstantiationUtil.serializeObject(catalog.getTable(tablePath));
+ } catch (Catalog.TableNotExistException e) {
+ return null;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ return new PaimonTableHandle(
+ tableName.getSchemaName(), tableName.getTableName(), serializedTable);
+ }
+
+ @Override
+ public List getTableLayouts(
+ ConnectorSession session,
+ ConnectorTableHandle table,
+ Constraint constraint,
+ Optional> desiredColumns) {
+ PaimonTableHandle handle = (PaimonTableHandle) table;
+ ConnectorTableLayout layout =
+ new ConnectorTableLayout(
+ new PaimonTableLayoutHandle(handle, constraint.getSummary()));
+ return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
+ }
+
+ @Override
+ public ConnectorTableLayout getTableLayout(
+ ConnectorSession session, ConnectorTableLayoutHandle handle) {
+ return new ConnectorTableLayout(handle);
+ }
+
+ @Override
+ public ConnectorTableMetadata getTableMetadata(
+ ConnectorSession session, ConnectorTableHandle table) {
+ return ((PaimonTableHandle) table).tableMetadata(typeManager);
+ }
+
+ @Override
+ public Map getColumnHandles(
+ ConnectorSession session, ConnectorTableHandle tableHandle) {
+ PaimonTableHandle table = (PaimonTableHandle) tableHandle;
+ Map handleMap = new HashMap<>();
+ for (ColumnMetadata column : table.columnMetadatas(typeManager)) {
+ handleMap.put(column.getName(), table.columnHandle(column.getName(), typeManager));
+ }
+ return handleMap;
+ }
+
+ @Override
+ public ColumnMetadata getColumnMetadata(
+ ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) {
+ return ((PaimonColumnHandle) columnHandle).getColumnMetadata();
+ }
+
+ @Override
+ public List listTables(ConnectorSession session, Optional schemaName) {
+ List tables = new ArrayList<>();
+ schemaName
+ .map(Collections::singletonList)
+ .orElseGet(catalog::listDatabases)
+ .forEach(schema -> tables.addAll(listTables(schema)));
+ return tables;
+ }
+
+ private List listTables(String schema) {
+ try {
+ return catalog.listTables(schema).stream()
+ .map(table -> new SchemaTableName(schema, table))
+ .collect(toList());
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // TODO: Need to insert method,the pr of presto writer will do this
+ @Override
+ public ConnectorOutputTableHandle beginCreateTable(
+ ConnectorSession session,
+ ConnectorTableMetadata tableMetadata,
+ Optional layout) {
+ return null;
+ }
+
+ @Override
+ public void createTable(
+ ConnectorSession session,
+ ConnectorTableMetadata tableMetadata,
+ boolean ignoreExisting) {
+ SchemaTableName table = tableMetadata.getTable();
+ Identifier identifier = Identifier.create(table.getSchemaName(), table.getTableName());
+
+ try {
+ catalog.createTable(identifier, prepareSchema(tableMetadata), true);
+ } catch (Catalog.TableAlreadyExistException e) {
+ throw new RuntimeException(format("table already existed: '%s'", table.getTableName()));
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new RuntimeException(format("database not exists: '%s'", table.getSchemaName()));
+ }
+ }
+
+ // TODO
+ @Override
+ public Optional finishCreateTable(
+ ConnectorSession session,
+ ConnectorOutputTableHandle tableHandle,
+ Collection fragments,
+ Collection computedStatistics) {
+ return Optional.empty();
+ }
+
+ // TODO: options is not set
+ private Schema prepareSchema(ConnectorTableMetadata tableMetadata) {
+ Schema.Builder builder =
+ Schema.newBuilder()
+ .primaryKey(getPrimaryKeys(tableMetadata.getProperties()))
+ .partitionKeys(getPartitionedKeys(tableMetadata.getProperties()));
+
+ for (ColumnMetadata column : tableMetadata.getColumns()) {
+ builder.column(
+ column.getName(),
+ PaimonTypeUtils.toPaimonType(column.getType()),
+ column.getComment());
+ }
+ return builder.build();
+ }
+
+ private List getPartitionedKeys(Map tableProperties) {
+ List partitionedKeys =
+ (List) tableProperties.get(CoreOptions.PARTITION.key());
+ return partitionedKeys == null ? ImmutableList.of() : ImmutableList.copyOf(partitionedKeys);
+ }
+
+ private List getPrimaryKeys(Map tableProperties) {
+ List primaryKeys =
+ (List) tableProperties.get(CoreOptions.PRIMARY_KEY.key());
+ return primaryKeys == null ? ImmutableList.of() : ImmutableList.copyOf(primaryKeys);
+ }
+
+ @Override
+ public void renameTable(
+ ConnectorSession session,
+ ConnectorTableHandle tableHandle,
+ SchemaTableName newTableName) {
+ PaimonTableHandle oldTableHandle = (PaimonTableHandle) tableHandle;
+ try {
+ catalog.renameTable(
+ new Identifier(oldTableHandle.getSchemaName(), oldTableHandle.getTableName()),
+ new Identifier(newTableName.getSchemaName(), newTableName.getTableName()),
+ true);
+ } catch (Catalog.TableNotExistException | Catalog.TableAlreadyExistException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) {
+ PaimonTableHandle paimonTableHandle = (PaimonTableHandle) tableHandle;
+ try {
+ catalog.dropTable(
+ new Identifier(
+ paimonTableHandle.getSchemaName(), paimonTableHandle.getTableName()),
+ true);
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Map> listTableColumns(
+ ConnectorSession session, SchemaTablePrefix prefix) {
+ requireNonNull(prefix, "prefix is null");
+ List tableNames;
+ if (prefix.getTableName() != null) {
+ tableNames = Collections.singletonList(prefix.toSchemaTableName());
+ } else {
+ tableNames = listTables(session, Optional.of(prefix.getSchemaName()));
+ }
+
+ return tableNames.stream()
+ .collect(
+ toMap(
+ Function.identity(),
+ table ->
+ getTableHandle(session, table)
+ .columnMetadatas(typeManager)));
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonModule.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonModule.java
new file mode 100644
index 0000000000000..51d8733073b7c
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonModule.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
+import static java.util.Objects.requireNonNull;
+
+import com.facebook.presto.common.type.TypeManager;
+import com.facebook.presto.spi.function.FunctionMetadataManager;
+import com.facebook.presto.spi.function.StandardFunctionResolution;
+import com.facebook.presto.spi.relation.RowExpressionService;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+import org.apache.paimon.options.Options;
+
+import java.util.Map;
+
+/** Module for binding instance. */
+public class PaimonModule implements Module {
+
+ private final String connectorId;
+ private final TypeManager typeManager;
+ private final FunctionMetadataManager functionMetadataManager;
+ private final StandardFunctionResolution standardFunctionResolution;
+ private final RowExpressionService rowExpressionService;
+ private final Map config;
+
+ public PaimonModule(
+ String connectorId,
+ TypeManager typeManager,
+ FunctionMetadataManager functionMetadataManager,
+ StandardFunctionResolution standardFunctionResolution,
+ RowExpressionService rowExpressionService,
+ Map config) {
+ this.connectorId = requireNonNull(connectorId, "catalogName is null");
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ this.functionMetadataManager = functionMetadataManager;
+ this.standardFunctionResolution = standardFunctionResolution;
+ this.rowExpressionService = rowExpressionService;
+ this.config = config;
+ }
+
+ @Override
+ public void configure(Binder binder) {
+ binder.bind(PaimonConnectorId.class).toInstance(new PaimonConnectorId(connectorId));
+ binder.bind(TypeManager.class).toInstance(typeManager);
+ binder.bind(PaimonConnector.class).in(Scopes.SINGLETON);
+ binder.bind(PaimonMetadata.class).in(Scopes.SINGLETON);
+ binder.bind(PaimonSplitManager.class).in(Scopes.SINGLETON);
+ binder.bind(PaimonPageSourceProvider.class).in(Scopes.SINGLETON);
+ binder.bind(FunctionMetadataManager.class).toInstance(functionMetadataManager);
+ binder.bind(StandardFunctionResolution.class).toInstance(standardFunctionResolution);
+ binder.bind(RowExpressionService.class).toInstance(rowExpressionService);
+ binder.bind(Options.class).toInstance(Options.fromMap(config));
+ binder.bind(PaimonTransactionManager.class).in(Scopes.SINGLETON);
+
+ configBinder(binder).bindConfig(PaimonConfig.class);
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPageSource.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPageSource.java
new file mode 100644
index 0000000000000..908086e1b1c4c
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPageSource.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import static com.facebook.presto.common.type.BigintType.BIGINT;
+import static com.facebook.presto.common.type.DateType.DATE;
+import static com.facebook.presto.common.type.Decimals.encodeShortScaledValue;
+import static com.facebook.presto.common.type.Decimals.isLongDecimal;
+import static com.facebook.presto.common.type.Decimals.isShortDecimal;
+import static com.facebook.presto.common.type.IntegerType.INTEGER;
+import static com.facebook.presto.common.type.RealType.REAL;
+import static com.facebook.presto.common.type.SmallintType.SMALLINT;
+import static com.facebook.presto.common.type.TimeType.TIME;
+import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
+import static com.facebook.presto.common.type.TinyintType.TINYINT;
+import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
+import static io.airlift.slice.Slices.wrappedBuffer;
+import static java.lang.String.format;
+
+import com.facebook.presto.common.Page;
+import com.facebook.presto.common.PageBuilder;
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.BlockBuilder;
+import com.facebook.presto.common.type.ArrayType;
+import com.facebook.presto.common.type.CharType;
+import com.facebook.presto.common.type.DecimalType;
+import com.facebook.presto.common.type.Decimals;
+import com.facebook.presto.common.type.MapType;
+import com.facebook.presto.common.type.RowType;
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.common.type.VarbinaryType;
+import com.facebook.presto.common.type.VarcharType;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.PrestoException;
+import io.airlift.slice.Slice;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.shade.guava30.com.google.common.base.Verify;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeChecks;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.InternalRowUtils;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.math.BigDecimal;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/** Presto {@link ConnectorPageSource}. */
+public class PaimonPageSource implements ConnectorPageSource {
+
+ private static final int ROWS_PER_REQUEST = 4096;
+ private final CloseableIterator iterator;
+ private final PageBuilder pageBuilder;
+ private final List prestoColumnTypes;
+ private final List paimonColumnTypes;
+ private boolean isFinished = false;
+ private long numReturn = 0;
+
+ public PaimonPageSource(RecordReader reader, List projectedColumns) {
+ this.iterator = reader.toCloseableIterator();
+ this.prestoColumnTypes = new ArrayList<>();
+ this.paimonColumnTypes = new ArrayList<>();
+ for (ColumnHandle handle : projectedColumns) {
+ PaimonColumnHandle paimonColumnHandle = (PaimonColumnHandle) handle;
+ PaimonPageSource.this.prestoColumnTypes.add(paimonColumnHandle.getPrestoType());
+ PaimonPageSource.this.paimonColumnTypes.add(paimonColumnHandle.paimonType());
+ }
+
+ this.pageBuilder = new PageBuilder(PaimonPageSource.this.prestoColumnTypes);
+ }
+
+ private static void writeSlice(BlockBuilder output, Type type, Object value) {
+ if (type instanceof VarcharType || type instanceof CharType) {
+ type.writeSlice(output, wrappedBuffer(((BinaryString) value).toBytes()));
+ } else if (type instanceof VarbinaryType) {
+ type.writeSlice(output, wrappedBuffer((byte[]) value));
+ } else {
+ throw new PrestoException(
+ GENERIC_INTERNAL_ERROR, "Unhandled type for Slice: " + type.getTypeSignature());
+ }
+ }
+
+ private static void writeObject(BlockBuilder output, Type type, Object value) {
+ if (type instanceof DecimalType) {
+ Verify.verify(isLongDecimal(type), "The type should be long decimal");
+ DecimalType decimalType = (DecimalType) type;
+ BigDecimal decimal = ((Decimal) value).toBigDecimal();
+ type.writeSlice(output, Decimals.encodeScaledValue(decimal, decimalType.getScale()));
+ } else {
+ throw new PrestoException(
+ GENERIC_INTERNAL_ERROR,
+ "Unhandled type for Object: " + type.getTypeSignature());
+ }
+ }
+
+ @Override
+ public long getCompletedBytes() {
+ return 0;
+ }
+
+ @Override
+ public long getCompletedPositions() {
+ return 0;
+ }
+
+ @Override
+ public long getReadTimeNanos() {
+ return 0;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return isFinished;
+ }
+
+ @Override
+ public Page getNextPage() {
+ return ClassLoaderUtils.runWithContextClassLoader(
+ () -> {
+ try {
+ return nextPage();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ },
+ PaimonPageSource.class.getClassLoader());
+ }
+
+ @Override
+ public long getSystemMemoryUsage() {
+ return 0;
+ }
+
+ @Nullable
+ private Page nextPage() throws IOException {
+ int count = 0;
+ while (count < ROWS_PER_REQUEST && !pageBuilder.isFull()) {
+ if (!iterator.hasNext()) {
+ isFinished = true;
+ return returnPage(count);
+ }
+
+ InternalRow row = iterator.next();
+ pageBuilder.declarePosition();
+ count++;
+ for (int i = 0; i < prestoColumnTypes.size(); i++) {
+ BlockBuilder output = pageBuilder.getBlockBuilder(i);
+ appendTo(
+ prestoColumnTypes.get(i),
+ paimonColumnTypes.get(i),
+ InternalRowUtils.get(row, i, paimonColumnTypes.get(i)),
+ output);
+ }
+ }
+
+ return returnPage(count);
+ }
+
+ private Page returnPage(int count) {
+ if (count == 0) {
+ return null;
+ }
+ numReturn += count;
+ Page page = pageBuilder.build();
+ pageBuilder.reset();
+ return page;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ this.iterator.close();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void appendTo(Type prestoType, DataType paimonType, Object value, BlockBuilder output) {
+ if (value == null) {
+ output.appendNull();
+ return;
+ }
+
+ Class> javaType = prestoType.getJavaType();
+ if (javaType == boolean.class) {
+ prestoType.writeBoolean(output, (Boolean) value);
+ } else if (javaType == long.class) {
+ if (prestoType.equals(BIGINT)
+ || prestoType.equals(INTEGER)
+ || prestoType.equals(TINYINT)
+ || prestoType.equals(SMALLINT)
+ || prestoType.equals(DATE)) {
+ prestoType.writeLong(output, ((Number) value).longValue());
+ } else if (prestoType.equals(REAL)) {
+ prestoType.writeLong(output, Float.floatToIntBits((Float) value));
+ } else if (prestoType instanceof DecimalType) {
+ Verify.verify(isShortDecimal(prestoType), "The type should be short decimal");
+ DecimalType decimalType = (DecimalType) prestoType;
+ BigDecimal decimal = ((Decimal) value).toBigDecimal();
+ prestoType.writeLong(
+ output, encodeShortScaledValue(decimal, decimalType.getScale()));
+ } else if (prestoType.equals(TIMESTAMP)) {
+ prestoType.writeLong(
+ output,
+ ((Timestamp) value)
+ .toLocalDateTime()
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli());
+ } else if (prestoType.equals(TIME)) {
+ prestoType.writeLong(output, (int) value * 1_000);
+ } else {
+ throw new PrestoException(
+ GENERIC_INTERNAL_ERROR,
+ format("Unhandled type for %s: %s", javaType.getSimpleName(), prestoType));
+ }
+ } else if (javaType == double.class) {
+ prestoType.writeDouble(output, ((Number) value).doubleValue());
+ } else if (prestoType instanceof DecimalType) {
+ writeObject(output, prestoType, value);
+ } else if (javaType == Slice.class) {
+ writeSlice(output, prestoType, value);
+ } else if (javaType == Block.class) {
+ writeBlock(output, prestoType, paimonType, value);
+ } else {
+ throw new PrestoException(
+ GENERIC_INTERNAL_ERROR,
+ format("Unhandled type for %s: %s", javaType.getSimpleName(), prestoType));
+ }
+ }
+
+ private void writeBlock(
+ BlockBuilder output, Type prestoType, DataType paimonType, Object value) {
+ if (prestoType instanceof ArrayType) {
+ BlockBuilder builder = output.beginBlockEntry();
+
+ InternalArray arrayData = (InternalArray) value;
+ DataType elementType = DataTypeChecks.getNestedTypes(paimonType).get(0);
+ for (int i = 0; i < arrayData.size(); i++) {
+ appendTo(
+ prestoType.getTypeParameters().get(0),
+ elementType,
+ InternalRowUtils.get(arrayData, i, elementType),
+ builder);
+ }
+
+ output.closeEntry();
+ return;
+ }
+ if (prestoType instanceof RowType) {
+ InternalRow rowData = (InternalRow) value;
+ BlockBuilder builder = output.beginBlockEntry();
+ for (int index = 0; index < prestoType.getTypeParameters().size(); index++) {
+ Type fieldType = prestoType.getTypeParameters().get(index);
+ DataType fieldLogicalType =
+ ((org.apache.paimon.types.RowType) paimonType).getTypeAt(index);
+ appendTo(
+ fieldType,
+ fieldLogicalType,
+ InternalRowUtils.get(rowData, index, fieldLogicalType),
+ builder);
+ }
+ output.closeEntry();
+ return;
+ }
+ if (prestoType instanceof MapType) {
+ InternalMap mapData = (InternalMap) value;
+ InternalArray keyArray = mapData.keyArray();
+ InternalArray valueArray = mapData.valueArray();
+ DataType keyType = ((org.apache.paimon.types.MapType) paimonType).getKeyType();
+ DataType valueType = ((org.apache.paimon.types.MapType) paimonType).getValueType();
+ BlockBuilder builder = output.beginBlockEntry();
+ for (int i = 0; i < keyArray.size(); i++) {
+ appendTo(
+ prestoType.getTypeParameters().get(0),
+ keyType,
+ InternalRowUtils.get(keyArray, i, keyType),
+ builder);
+ appendTo(
+ prestoType.getTypeParameters().get(1),
+ valueType,
+ InternalRowUtils.get(valueArray, i, valueType),
+ builder);
+ }
+ output.closeEntry();
+ return;
+ }
+ throw new PrestoException(
+ GENERIC_INTERNAL_ERROR,
+ "Unhandled type for Block: " + prestoType.getTypeSignature());
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPageSourceProvider.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPageSourceProvider.java
new file mode 100644
index 0000000000000..9c55f9bc1b61f
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPageSourceProvider.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+
+import static com.facebook.presto.paimon.ClassLoaderUtils.runWithContextClassLoader;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.SplitContext;
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.RowType;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Presto {@link ConnectorPageSourceProvider}. */
+public class PaimonPageSourceProvider implements ConnectorPageSourceProvider {
+
+ @Override
+ public ConnectorPageSource createPageSource(
+ ConnectorTransactionHandle transactionHandle,
+ ConnectorSession session,
+ ConnectorSplit split,
+ ConnectorTableLayoutHandle layout,
+ List columns,
+ SplitContext splitContext) {
+ return runWithContextClassLoader(
+ () ->
+ createPageSource(
+ ((PaimonTableLayoutHandle) layout).getTableHandle(),
+ (PaimonSplit) split,
+ columns),
+ PaimonPageSourceProvider.class.getClassLoader());
+ }
+
+ private ConnectorPageSource createPageSource(
+ PaimonTableHandle tableHandle, PaimonSplit split, List columns) {
+ Table table = tableHandle.table();
+ ReadBuilder read = table.newReadBuilder();
+ RowType rowType = table.rowType();
+ List fieldNames = FieldNameUtils.fieldNames(rowType);
+ List projectedFields =
+ columns.stream()
+ .map(PaimonColumnHandle.class::cast)
+ .map(PaimonColumnHandle::getColumnName)
+ .collect(Collectors.toList());
+ if (!fieldNames.equals(projectedFields)) {
+ int[] projected = projectedFields.stream().mapToInt(fieldNames::indexOf).toArray();
+ read.withProjection(projected);
+ }
+
+ new PaimonFilterConverter(rowType)
+ .convert(tableHandle.getFilter())
+ .ifPresent(read::withFilter);
+
+ try {
+ return new PaimonPageSource(read.newRead().createReader(split.decodeSplit()), columns);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPlugin.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPlugin.java
new file mode 100644
index 0000000000000..fbffc45e73e71
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPlugin.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import com.facebook.presto.spi.Plugin;
+import com.facebook.presto.spi.connector.ConnectorFactory;
+
+import java.util.Collections;
+
+/** Presto {@link Plugin}. */
+public class PaimonPlugin implements Plugin {
+ @Override
+ public Iterable getConnectorFactories() {
+ return Collections.singletonList(new PaimonConnectorFactory());
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplit.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplit.java
new file mode 100644
index 0000000000000..c010578609adc
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplit.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE;
+
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.HostAddress;
+import com.facebook.presto.spi.NodeProvider;
+import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+import org.apache.paimon.table.source.Split;
+
+import java.util.Collections;
+import java.util.List;
+
+/** Presto {@link ConnectorSplit}. */
+public class PaimonSplit implements ConnectorSplit {
+
+ private final String splitSerialized;
+
+ @JsonCreator
+ public PaimonSplit(@JsonProperty("splitSerialized") String splitSerialized) {
+ this.splitSerialized = splitSerialized;
+ }
+
+ public static PaimonSplit fromSplit(Split split) {
+ return new PaimonSplit(EncodingUtils.encodeObjectToString(split));
+ }
+
+ @Override
+ public List getPreferredNodes(NodeProvider nodeProvider) {
+ return ImmutableList.of();
+ }
+
+ public Split decodeSplit() {
+ return EncodingUtils.decodeStringToObject(splitSerialized);
+ }
+
+ @JsonProperty
+ public String getSplitSerialized() {
+ return splitSerialized;
+ }
+
+ @Override
+ public NodeSelectionStrategy getNodeSelectionStrategy() {
+ return NO_PREFERENCE;
+ }
+
+ @Override
+ public Object getInfo() {
+ return Collections.emptyMap();
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplitManager.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplitManager.java
new file mode 100644
index 0000000000000..5f802fac36e86
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplitManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplitSource;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Presto {@link ConnectorSplitManager}. */
+public class PaimonSplitManager implements ConnectorSplitManager {
+
+ @Override
+ public ConnectorSplitSource getSplits(
+ ConnectorTransactionHandle transactionHandle,
+ ConnectorSession session,
+ ConnectorTableLayoutHandle layout,
+ SplitSchedulingContext splitSchedulingContext) {
+
+ PaimonTableHandle tableHandle = ((PaimonTableLayoutHandle) layout).getTableHandle();
+ Table table = tableHandle.table();
+ ReadBuilder readBuilder = table.newReadBuilder();
+ new PaimonFilterConverter(table.rowType())
+ .convert(tableHandle.getFilter())
+ .ifPresent(readBuilder::withFilter);
+ List splits = readBuilder.newScan().plan().splits();
+ return new PaimonSplitSource(
+ splits.stream().map(PaimonSplit::fromSplit).collect(Collectors.toList()));
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplitSource.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplitSource.java
new file mode 100644
index 0000000000000..9690c15cd3737
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplitSource.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorSplitSource;
+import com.facebook.presto.spi.connector.ConnectorPartitionHandle;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+/** Presto {@link ConnectorSplitSource}. */
+public class PaimonSplitSource implements ConnectorSplitSource {
+
+ private final Queue splits;
+
+ public PaimonSplitSource(List splits) {
+ this.splits = new LinkedList<>(splits);
+ }
+
+ @Override
+ public CompletableFuture getNextBatch(
+ ConnectorPartitionHandle partitionHandle, int maxSize) {
+ List batch = new ArrayList<>();
+ for (int i = 0; i < maxSize; i++) {
+ PaimonSplit split = splits.poll();
+ if (split == null) {
+ break;
+ }
+ batch.add(split);
+ }
+ return CompletableFuture.completedFuture(new ConnectorSplitBatch(batch, isFinished()));
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isFinished() {
+ return splits.isEmpty();
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTableHandle.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTableHandle.java
new file mode 100644
index 0000000000000..a41b71e724d03
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTableHandle.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import com.facebook.presto.common.predicate.TupleDomain;
+import com.facebook.presto.common.type.TypeManager;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableMetadata;
+import com.facebook.presto.spi.SchemaTableName;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Presto {@link ConnectorTableHandle}. */
+public class PaimonTableHandle implements ConnectorTableHandle {
+
+ private final String schemaName;
+ private final String tableName;
+ private final byte[] serializedTable;
+ private final TupleDomain filter;
+ private final Optional> projectedColumns;
+
+ private Table lazyTable;
+
+ public PaimonTableHandle(String schemaName, String tableName, byte[] serializedTable) {
+ this(schemaName, tableName, serializedTable, TupleDomain.all(), Optional.empty());
+ }
+
+ @JsonCreator
+ public PaimonTableHandle(
+ @JsonProperty("schemaName") String schemaName,
+ @JsonProperty("tableName") String tableName,
+ @JsonProperty("serializedTable") byte[] serializedTable,
+ @JsonProperty("filter") TupleDomain filter,
+ @JsonProperty("projection") Optional> projectedColumns) {
+ this.schemaName = schemaName;
+ this.tableName = tableName;
+ this.serializedTable = serializedTable;
+ this.filter = filter;
+ this.projectedColumns = projectedColumns;
+ }
+
+ @JsonProperty
+ public String getSchemaName() {
+ return schemaName;
+ }
+
+ @JsonProperty
+ public String getTableName() {
+ return tableName;
+ }
+
+ @JsonProperty
+ public byte[] getSerializedTable() {
+ return serializedTable;
+ }
+
+ @JsonProperty
+ public TupleDomain getFilter() {
+ return filter;
+ }
+
+ @JsonProperty
+ public Optional> getProjectedColumns() {
+ return projectedColumns;
+ }
+
+ public PaimonTableHandle copy(TupleDomain filter) {
+ return new PaimonTableHandle(
+ schemaName, tableName, serializedTable, filter, projectedColumns);
+ }
+
+ public PaimonTableHandle copy(Optional> projectedColumns) {
+ return new PaimonTableHandle(
+ schemaName, tableName, serializedTable, filter, projectedColumns);
+ }
+
+ public Table table() {
+ if (lazyTable == null) {
+ try {
+ lazyTable =
+ InstantiationUtil.deserializeObject(
+ serializedTable, this.getClass().getClassLoader());
+ } catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return lazyTable;
+ }
+
+ public ConnectorTableMetadata tableMetadata(TypeManager typeManager) {
+ return new ConnectorTableMetadata(
+ new SchemaTableName(schemaName, tableName), columnMetadatas(typeManager));
+ }
+
+ public List columnMetadatas(TypeManager typeManager) {
+ return table().rowType().getFields().stream()
+ .map(
+ column ->
+ new ColumnMetadata(
+ column.name(),
+ Objects.requireNonNull(
+ PaimonTypeUtils.toPrestoType(
+ column.type(), typeManager))))
+ .collect(Collectors.toList());
+ }
+
+ public PaimonColumnHandle columnHandle(String field, TypeManager typeManager) {
+ List fieldNames = FieldNameUtils.fieldNames(table().rowType());
+ int index = fieldNames.indexOf(field);
+ if (index == -1) {
+ throw new RuntimeException(
+ String.format("Cannot find field %s in schema %s", field, fieldNames));
+ }
+ return PaimonColumnHandle.create(field, table().rowType().getTypeAt(index), typeManager);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PaimonTableHandle that = (PaimonTableHandle) o;
+ return Arrays.equals(serializedTable, that.serializedTable)
+ && Objects.equals(schemaName, that.schemaName)
+ && Objects.equals(tableName, that.tableName)
+ && Objects.equals(filter, that.filter)
+ && Objects.equals(projectedColumns, that.projectedColumns);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ schemaName, tableName, filter, projectedColumns, Arrays.hashCode(serializedTable));
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTableLayoutHandle.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTableLayoutHandle.java
new file mode 100644
index 0000000000000..50e14a0aed1ab
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTableLayoutHandle.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import com.facebook.presto.common.predicate.TupleDomain;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.paimon.shade.guava30.com.google.common.base.MoreObjects;
+
+import java.util.Objects;
+
+/** Presto {@link ConnectorTableLayoutHandle}. */
+public class PaimonTableLayoutHandle implements ConnectorTableLayoutHandle {
+
+ private final PaimonTableHandle tableHandle;
+ private final TupleDomain constraintSummary;
+
+ @JsonCreator
+ public PaimonTableLayoutHandle(
+ @JsonProperty("tableHandle") PaimonTableHandle tableHandle,
+ @JsonProperty("constraintSummary") TupleDomain constraintSummary) {
+ this.tableHandle = Objects.requireNonNull(tableHandle, "table is null");
+ this.constraintSummary = constraintSummary;
+ }
+
+ @JsonProperty
+ public PaimonTableHandle getTableHandle() {
+ return tableHandle;
+ }
+
+ @JsonProperty
+ public TupleDomain getConstraintSummary() {
+ return constraintSummary;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ PaimonTableLayoutHandle other = (PaimonTableLayoutHandle) obj;
+ return Objects.equals(tableHandle, other.tableHandle)
+ && Objects.equals(constraintSummary, other.constraintSummary);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tableHandle, constraintSummary);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("tableHandle", tableHandle)
+ .add("constraintSummary", constraintSummary)
+ .toString();
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTransactionHandle.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTransactionHandle.java
new file mode 100644
index 0000000000000..77b438433816d
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTransactionHandle.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import static java.util.Objects.requireNonNull;
+
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/** Presto {@link ConnectorTransactionHandle}. */
+public class PaimonTransactionHandle implements ConnectorTransactionHandle {
+
+ private final UUID uuid;
+
+ public PaimonTransactionHandle() {
+ this(UUID.randomUUID());
+ }
+
+ public PaimonTransactionHandle(@JsonProperty("uuid") UUID uuid) {
+ this.uuid = requireNonNull(uuid, "uuid is null");
+ }
+
+ @JsonProperty
+ public UUID getUuid() {
+ return uuid;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+ PaimonTransactionHandle other = (PaimonTransactionHandle) obj;
+ return Objects.equals(uuid, other.uuid);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(uuid);
+ }
+
+ @Override
+ public String toString() {
+ return uuid.toString();
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTransactionManager.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTransactionManager.java
new file mode 100644
index 0000000000000..2760b9d5d5d18
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTransactionManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import org.apache.paimon.shade.guava30.com.google.common.base.Preconditions;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Presto TransactionManager. */
+public class PaimonTransactionManager {
+
+ private final Map transactions =
+ new ConcurrentHashMap<>();
+
+ public ConnectorMetadata get(ConnectorTransactionHandle transaction) {
+ ConnectorMetadata metadata = transactions.get(transaction);
+ Preconditions.checkArgument(metadata != null, "no such transaction: %s", transaction);
+ return metadata;
+ }
+
+ public ConnectorMetadata remove(ConnectorTransactionHandle transaction) {
+ ConnectorMetadata metadata = transactions.remove(transaction);
+ Preconditions.checkArgument(metadata != null, "no such transaction: %s", transaction);
+ return metadata;
+ }
+
+ public void put(ConnectorTransactionHandle transaction, ConnectorMetadata metadata) {
+ ConnectorMetadata existing = transactions.putIfAbsent(transaction, metadata);
+ Preconditions.checkState(existing == null, "transaction already exists: %s", existing);
+ }
+}
diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTypeUtils.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTypeUtils.java
new file mode 100644
index 0000000000000..b317e57b32b85
--- /dev/null
+++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTypeUtils.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import static java.lang.String.format;
+
+import com.facebook.presto.common.type.BigintType;
+import com.facebook.presto.common.type.IntegerType;
+import com.facebook.presto.common.type.RealType;
+import com.facebook.presto.common.type.SmallintType;
+import com.facebook.presto.common.type.StandardTypes;
+import com.facebook.presto.common.type.TimestampWithTimeZoneType;
+import com.facebook.presto.common.type.TinyintType;
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.common.type.TypeManager;
+import com.facebook.presto.common.type.TypeSignature;
+import com.facebook.presto.common.type.TypeSignatureParameter;
+import com.facebook.presto.common.type.VarbinaryType;
+import com.facebook.presto.common.type.VarcharType;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/** Presto type from Paimon Type. */
+public class PaimonTypeUtils {
+
+ private PaimonTypeUtils() {}
+
+ public static Type toPrestoType(DataType paimonType, TypeManager typeManager) {
+ if (paimonType instanceof CharType) {
+ return com.facebook.presto.common.type.CharType.createCharType(
+ Math.min(
+ com.facebook.presto.common.type.CharType.MAX_LENGTH,
+ ((CharType) paimonType).getLength()));
+ } else if (paimonType instanceof VarCharType) {
+ return VarcharType.createUnboundedVarcharType();
+ } else if (paimonType instanceof BooleanType) {
+ return com.facebook.presto.common.type.BooleanType.BOOLEAN;
+ } else if (paimonType instanceof BinaryType) {
+ return VarbinaryType.VARBINARY;
+ } else if (paimonType instanceof VarBinaryType) {
+ return VarbinaryType.VARBINARY;
+ } else if (paimonType instanceof DecimalType) {
+ return com.facebook.presto.common.type.DecimalType.createDecimalType(
+ ((DecimalType) paimonType).getPrecision(),
+ ((DecimalType) paimonType).getScale());
+ } else if (paimonType instanceof TinyIntType) {
+ return TinyintType.TINYINT;
+ } else if (paimonType instanceof SmallIntType) {
+ return SmallintType.SMALLINT;
+ } else if (paimonType instanceof IntType) {
+ return IntegerType.INTEGER;
+ } else if (paimonType instanceof BigIntType) {
+ return BigintType.BIGINT;
+ } else if (paimonType instanceof FloatType) {
+ return RealType.REAL;
+ } else if (paimonType instanceof DoubleType) {
+ return com.facebook.presto.common.type.DoubleType.DOUBLE;
+ } else if (paimonType instanceof DateType) {
+ return com.facebook.presto.common.type.DateType.DATE;
+ } else if (paimonType instanceof TimeType) {
+ return com.facebook.presto.common.type.TimeType.TIME;
+ } else if (paimonType instanceof TimestampType) {
+ return com.facebook.presto.common.type.TimestampType.TIMESTAMP;
+ } else if (paimonType instanceof LocalZonedTimestampType) {
+ return TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
+ } else if (paimonType instanceof ArrayType) {
+ DataType elementType = ((ArrayType) paimonType).getElementType();
+ return new com.facebook.presto.common.type.ArrayType(
+ Objects.requireNonNull(toPrestoType(elementType, typeManager)));
+ } else if (paimonType instanceof MapType) {
+ MapType paimonMapType = (MapType) paimonType;
+ TypeSignature keyType =
+ Objects.requireNonNull(toPrestoType(paimonMapType.getKeyType(), typeManager))
+ .getTypeSignature();
+ TypeSignature valueType =
+ Objects.requireNonNull(toPrestoType(paimonMapType.getValueType(), typeManager))
+ .getTypeSignature();
+ return typeManager.getParameterizedType(
+ StandardTypes.MAP,
+ ImmutableList.of(
+ TypeSignatureParameter.of(keyType),
+ TypeSignatureParameter.of(valueType)));
+ } else if (paimonType instanceof RowType) {
+ RowType rowType = (RowType) paimonType;
+ List fields =
+ rowType.getFields().stream()
+ .map(
+ field ->
+ com.facebook.presto.common.type.RowType.field(
+ field.name(),
+ toPrestoType(field.type(), typeManager)))
+ .collect(Collectors.toList());
+ return com.facebook.presto.common.type.RowType.from(fields);
+ } else {
+ throw new UnsupportedOperationException(
+ format("Cannot convert from Paimon type '%s' to Presto type", paimonType));
+ }
+ }
+
+ public static DataType toPaimonType(Type prestoType) {
+ if (prestoType instanceof com.facebook.presto.common.type.CharType) {
+ return DataTypes.CHAR(
+ ((com.facebook.presto.common.type.CharType) prestoType).getLength());
+ } else if (prestoType instanceof VarcharType) {
+ return DataTypes.VARCHAR(
+ Math.min(Integer.MAX_VALUE, ((VarcharType) prestoType).getLength()));
+ } else if (prestoType instanceof com.facebook.presto.common.type.BooleanType) {
+ return DataTypes.BOOLEAN();
+ } else if (prestoType instanceof VarbinaryType) {
+ // The varbinary in Presto currently does not accept the maximum length parameter, it is
+ // unbounded
+ return DataTypes.VARBINARY(Integer.MAX_VALUE);
+ } else if (prestoType instanceof com.facebook.presto.common.type.DecimalType) {
+ return DataTypes.DECIMAL(
+ ((com.facebook.presto.common.type.DecimalType) prestoType).getPrecision(),
+ ((com.facebook.presto.common.type.DecimalType) prestoType).getScale());
+ } else if (prestoType instanceof TinyintType) {
+ return DataTypes.TINYINT();
+ } else if (prestoType instanceof SmallintType) {
+ return DataTypes.SMALLINT();
+ } else if (prestoType instanceof IntegerType) {
+ return DataTypes.INT();
+ } else if (prestoType instanceof BigintType) {
+ return DataTypes.BIGINT();
+ } else if (prestoType instanceof RealType) {
+ return DataTypes.FLOAT();
+ } else if (prestoType instanceof com.facebook.presto.common.type.DoubleType) {
+ return DataTypes.DOUBLE();
+ } else if (prestoType instanceof com.facebook.presto.common.type.DateType) {
+ return DataTypes.DATE();
+ } else if (prestoType instanceof com.facebook.presto.common.type.TimeType) {
+ return new TimeType();
+ } else if (prestoType instanceof com.facebook.presto.common.type.TimestampType) {
+ return DataTypes.TIMESTAMP();
+ } else if (prestoType instanceof TimestampWithTimeZoneType) {
+ return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
+ } else if (prestoType instanceof com.facebook.presto.common.type.ArrayType) {
+ return DataTypes.ARRAY(
+ toPaimonType(
+ ((com.facebook.presto.common.type.ArrayType) prestoType)
+ .getElementType()));
+ } else if (prestoType instanceof com.facebook.presto.common.type.MapType) {
+ return DataTypes.MAP(
+ toPaimonType(
+ ((com.facebook.presto.common.type.MapType) prestoType).getKeyType()),
+ toPaimonType(
+ ((com.facebook.presto.common.type.MapType) prestoType).getValueType()));
+ } else if (prestoType instanceof com.facebook.presto.common.type.RowType) {
+ com.facebook.presto.common.type.RowType rowType =
+ (com.facebook.presto.common.type.RowType) prestoType;
+ AtomicInteger id = new AtomicInteger(0);
+ List dataFields =
+ rowType.getFields().stream()
+ .map(
+ field ->
+ new DataField(
+ id.getAndIncrement(),
+ field.getName().get(),
+ toPaimonType(field.getType())))
+ .collect(Collectors.toList());
+ return new RowType(true, dataFields);
+ } else {
+ throw new UnsupportedOperationException(
+ format("Cannot convert from Presto type '%s' to Paimon type", prestoType));
+ }
+ }
+}
diff --git a/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonColumnHandleTest.java b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonColumnHandleTest.java
new file mode 100644
index 0000000000000..40c41d5e8fe0d
--- /dev/null
+++ b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonColumnHandleTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import com.facebook.airlift.json.JsonCodec;
+import com.facebook.airlift.json.JsonCodecFactory;
+import com.facebook.airlift.json.JsonObjectMapperProvider;
+import com.facebook.airlift.json.ObjectMapperProvider;
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.type.TypeDeserializer;
+import com.google.common.collect.ImmutableMap;
+import org.testng.annotations.Test;
+
+import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
+import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature;
+import static com.facebook.presto.metadata.FunctionAndTypeManager.createTestFunctionAndTypeManager;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link PaimonColumnHandle}. */
+public class PaimonColumnHandleTest {
+
+ @Test
+ public void testPrestoColumnHandle() {
+ VarCharType varCharType = VarCharType.stringType(true);
+ PaimonColumnHandle expected =
+ new PaimonColumnHandle(
+ "name",
+ JsonSerdeUtil.toJson(varCharType),
+ createTestFunctionAndTypeManager().getType(parseTypeSignature(VARCHAR)));
+ testRoundTrip(expected);
+ }
+
+ private void testRoundTrip(PaimonColumnHandle expected) {
+ ObjectMapperProvider objectMapperProvider = new JsonObjectMapperProvider();
+ objectMapperProvider.setJsonDeserializers(
+ ImmutableMap.of(
+ Type.class, new TypeDeserializer(createTestFunctionAndTypeManager())));
+ JsonCodec codec =
+ new JsonCodecFactory(objectMapperProvider).jsonCodec(PaimonColumnHandle.class);
+ String json = codec.toJson(expected);
+ PaimonColumnHandle actual = codec.fromJson(json);
+ assertThat(actual).isEqualTo(expected);
+ assertThat(actual.getColumnName()).isEqualTo(expected.getColumnName());
+ assertThat(actual.paimonType()).isEqualTo(expected.paimonType());
+ assertThat(actual.getPrestoType()).isEqualTo(expected.getPrestoType());
+ assertThat(actual.getTypeString()).isEqualTo(expected.getTypeString());
+ }
+}
diff --git a/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonPluginTest.java b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonPluginTest.java
new file mode 100644
index 0000000000000..b91ce8526176f
--- /dev/null
+++ b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonPluginTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import com.facebook.presto.spi.Plugin;
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorFactory;
+import com.facebook.presto.testing.TestingConnectorContext;
+import com.google.common.collect.ImmutableMap;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.UUID;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link PaimonPlugin}. */
+public class PaimonPluginTest {
+
+ @Test
+ public void testCreatePaimonConnector() throws IOException {
+ String warehouse =
+ Files.createTempDirectory(UUID.randomUUID().toString()).toUri().toString();
+ Plugin plugin = new PaimonPlugin();
+ ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories());
+ Connector connector =
+ factory.create(
+ "paimon",
+ ImmutableMap.of("warehouse", warehouse),
+ new TestingConnectorContext());
+ assertThat(connector).isNotNull();
+ }
+}
diff --git a/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonQueryRunner.java b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonQueryRunner.java
new file mode 100644
index 0000000000000..0e1d5dea5e83a
--- /dev/null
+++ b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonQueryRunner.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.airlift.log.Logging;
+import com.facebook.presto.Session;
+import com.facebook.presto.tests.DistributedQueryRunner;
+import com.facebook.presto.tpch.TpchPlugin;
+import com.google.common.collect.ImmutableMap;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
+
+/** The query runner of paimon. */
+public class PaimonQueryRunner {
+
+ private static final Logger LOG = Logger.get(PaimonQueryRunner.class);
+
+ private static final String PAIMON_CATALOG = "paimon";
+
+ private PaimonQueryRunner() {}
+
+ public static DistributedQueryRunner createPrestoQueryRunner(
+ Map extraProperties) throws Exception {
+ return createPrestoQueryRunner(extraProperties, ImmutableMap.of(), false);
+ }
+
+ public static DistributedQueryRunner createPrestoQueryRunner(
+ Map extraProperties,
+ Map extraConnectorProperties,
+ boolean createTpchTables)
+ throws Exception {
+
+ Session session = testSessionBuilder().setCatalog(PAIMON_CATALOG).setSchema("tpch").build();
+
+ DistributedQueryRunner queryRunner =
+ DistributedQueryRunner.builder(session).setExtraProperties(extraProperties).build();
+
+ queryRunner.installPlugin(new TpchPlugin());
+ queryRunner.createCatalog("tpch", "tpch");
+
+ Path dataDir = queryRunner.getCoordinator().getDataDirectory().resolve("paimon_data");
+ Path catalogDir = dataDir.getParent().resolve("catalog");
+
+ queryRunner.installPlugin(new PaimonPlugin());
+
+ Map options =
+ ImmutableMap.builder()
+ .put("warehouse", catalogDir.toFile().toURI().toString())
+ .putAll(extraConnectorProperties)
+ .build();
+
+ queryRunner.createCatalog(PAIMON_CATALOG, PAIMON_CATALOG, options);
+
+ queryRunner.execute("CREATE SCHEMA tpch");
+
+ // TODO
+ /*if (createTpchTables) {
+ copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, session, TpchTable.getTables());
+ }*/
+
+ return queryRunner;
+ }
+
+ public static void main(String[] args) throws InterruptedException {
+ Logging.initialize();
+ Map properties =
+ com.google.common.collect.ImmutableMap.of("http-server.http.port", "8080");
+ DistributedQueryRunner queryRunner = null;
+ try {
+ queryRunner = createPrestoQueryRunner(properties);
+ } catch (Throwable t) {
+ LOG.error(t);
+ System.exit(1);
+ }
+ TimeUnit.MILLISECONDS.sleep(10);
+ Logger log = Logger.get(PaimonQueryRunner.class);
+ log.info("======== SERVER STARTED ========");
+ log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
+ }
+}
diff --git a/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonSplitTest.java b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonSplitTest.java
new file mode 100644
index 0000000000000..dc1753fb061a6
--- /dev/null
+++ b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonSplitTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import com.facebook.airlift.json.JsonCodec;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link PaimonSplit}. */
+public class PaimonSplitTest {
+
+ private final JsonCodec codec = JsonCodec.jsonCodec(PaimonSplit.class);
+
+ @Test
+ public void testJsonRoundTrip() throws Exception {
+ byte[] serializedTable = TestPrestoUtils.getSerializedTable();
+ PaimonSplit expected = new PaimonSplit(Arrays.toString(serializedTable));
+ String json = codec.toJson(expected);
+ PaimonSplit actual = codec.fromJson(json);
+ assertThat(actual.getSplitSerialized()).isEqualTo(expected.getSplitSerialized());
+ }
+}
diff --git a/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonTableHandleTest.java b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonTableHandleTest.java
new file mode 100644
index 0000000000000..5a233605a9537
--- /dev/null
+++ b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonTableHandleTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import com.facebook.airlift.json.JsonCodec;
+import com.facebook.presto.common.predicate.TupleDomain;
+import org.testng.annotations.Test;
+
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link PaimonTableHandle}. */
+public class PaimonTableHandleTest {
+
+ private final JsonCodec codec = JsonCodec.jsonCodec(PaimonTableHandle.class);
+
+ @Test
+ public void testPrestoTableHandle() throws Exception {
+ byte[] serializedTable = TestPrestoUtils.getSerializedTable();
+ PaimonTableHandle expected =
+ new PaimonTableHandle(
+ "test", "user", serializedTable, TupleDomain.all(), Optional.empty());
+ testRoundTrip(expected);
+ }
+
+ private void testRoundTrip(PaimonTableHandle expected) {
+ String json = codec.toJson(expected);
+ PaimonTableHandle actual = codec.fromJson(json);
+ assertThat(actual).isEqualTo(expected);
+ assertThat(actual.getSchemaName()).isEqualTo(expected.getSchemaName());
+ assertThat(actual.getTableName()).isEqualTo(expected.getTableName());
+ assertThat(actual.getSerializedTable()).isEqualTo(expected.getSerializedTable());
+ assertThat(actual.getFilter()).isEqualTo(expected.getFilter());
+ assertThat(actual.getProjectedColumns()).isEqualTo(expected.getProjectedColumns());
+ }
+}
diff --git a/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonTypeTest.java b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonTypeTest.java
new file mode 100644
index 0000000000000..ae5bc418479f3
--- /dev/null
+++ b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonTypeTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.VarCharType;
+
+import com.facebook.presto.common.type.ArrayType;
+import com.facebook.presto.common.type.BigintType;
+import com.facebook.presto.common.type.BooleanType;
+import com.facebook.presto.common.type.CharType;
+import com.facebook.presto.common.type.DateType;
+import com.facebook.presto.common.type.DecimalType;
+import com.facebook.presto.common.type.DoubleType;
+import com.facebook.presto.common.type.IntegerType;
+import com.facebook.presto.common.type.RealType;
+import com.facebook.presto.common.type.RowType;
+import com.facebook.presto.common.type.SmallintType;
+import com.facebook.presto.common.type.StandardTypes;
+import com.facebook.presto.common.type.TimestampType;
+import com.facebook.presto.common.type.TimestampWithTimeZoneType;
+import com.facebook.presto.common.type.TinyintType;
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.common.type.TypeSignatureParameter;
+import com.facebook.presto.common.type.VarbinaryType;
+import com.facebook.presto.common.type.VarcharType;
+import com.facebook.presto.metadata.FunctionAndTypeManager;
+import com.google.common.collect.ImmutableList;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static com.facebook.presto.metadata.FunctionAndTypeManager.createTestFunctionAndTypeManager;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link PaimonTypeUtils}. */
+public class PaimonTypeTest {
+
+ @Test
+ public void testToPrestoType() {
+ Type charType =
+ PaimonTypeUtils.toPrestoType(DataTypes.CHAR(1), createTestFunctionAndTypeManager());
+ assertThat(Objects.requireNonNull(charType).getDisplayName()).isEqualTo("char(1)");
+
+ Type varCharType =
+ PaimonTypeUtils.toPrestoType(
+ DataTypes.VARCHAR(10), createTestFunctionAndTypeManager());
+ assertThat(Objects.requireNonNull(varCharType).getDisplayName()).isEqualTo("varchar");
+
+ Type booleanType =
+ PaimonTypeUtils.toPrestoType(
+ DataTypes.BOOLEAN(), createTestFunctionAndTypeManager());
+ assertThat(Objects.requireNonNull(booleanType).getDisplayName()).isEqualTo("boolean");
+
+ Type binaryType =
+ PaimonTypeUtils.toPrestoType(
+ DataTypes.BINARY(10), createTestFunctionAndTypeManager());
+ assertThat(Objects.requireNonNull(binaryType).getDisplayName()).isEqualTo("varbinary");
+
+ Type varBinaryType =
+ PaimonTypeUtils.toPrestoType(
+ DataTypes.VARBINARY(10), createTestFunctionAndTypeManager());
+ assertThat(Objects.requireNonNull(varBinaryType).getDisplayName()).isEqualTo("varbinary");
+
+ assertThat(
+ PaimonTypeUtils.toPrestoType(
+ DataTypes.DECIMAL(38, 0),
+ createTestFunctionAndTypeManager())
+ .getDisplayName())
+ .isEqualTo("decimal(38,0)");
+
+ org.apache.paimon.types.DecimalType decimal = DataTypes.DECIMAL(2, 2);
+ assertThat(
+ PaimonTypeUtils.toPrestoType(decimal, createTestFunctionAndTypeManager())
+ .getDisplayName())
+ .isEqualTo("decimal(2,2)");
+
+ Type tinyIntType =
+ PaimonTypeUtils.toPrestoType(
+ DataTypes.TINYINT(), createTestFunctionAndTypeManager());
+ assertThat(Objects.requireNonNull(tinyIntType).getDisplayName()).isEqualTo("tinyint");
+
+ Type smallIntType =
+ PaimonTypeUtils.toPrestoType(
+ DataTypes.SMALLINT(), createTestFunctionAndTypeManager());
+ assertThat(Objects.requireNonNull(smallIntType).getDisplayName()).isEqualTo("smallint");
+
+ Type intType =
+ PaimonTypeUtils.toPrestoType(DataTypes.INT(), createTestFunctionAndTypeManager());
+ assertThat(Objects.requireNonNull(intType).getDisplayName()).isEqualTo("integer");
+
+ Type bigIntType =
+ PaimonTypeUtils.toPrestoType(
+ DataTypes.BIGINT(), createTestFunctionAndTypeManager());
+ assertThat(Objects.requireNonNull(bigIntType).getDisplayName()).isEqualTo("bigint");
+
+ Type doubleType =
+ PaimonTypeUtils.toPrestoType(
+ DataTypes.DOUBLE(), createTestFunctionAndTypeManager());
+ assertThat(Objects.requireNonNull(doubleType).getDisplayName()).isEqualTo("double");
+
+ Type dateType =
+ PaimonTypeUtils.toPrestoType(DataTypes.DATE(), createTestFunctionAndTypeManager());
+ assertThat(Objects.requireNonNull(dateType).getDisplayName()).isEqualTo("date");
+
+ Type timeType =
+ PaimonTypeUtils.toPrestoType(new TimeType(), createTestFunctionAndTypeManager());
+ assertThat(Objects.requireNonNull(timeType).getDisplayName()).isEqualTo("time");
+
+ Type timestampType =
+ PaimonTypeUtils.toPrestoType(
+ DataTypes.TIMESTAMP(), createTestFunctionAndTypeManager());
+ assertThat(Objects.requireNonNull(timestampType).getDisplayName()).isEqualTo("timestamp");
+
+ Type localZonedTimestampType =
+ PaimonTypeUtils.toPrestoType(
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(),
+ createTestFunctionAndTypeManager());
+ assertThat(Objects.requireNonNull(localZonedTimestampType).getDisplayName())
+ .isEqualTo("timestamp with time zone");
+
+ Type mapType =
+ PaimonTypeUtils.toPrestoType(
+ DataTypes.MAP(DataTypes.BIGINT(), DataTypes.STRING()),
+ createTestFunctionAndTypeManager());
+ assertThat(Objects.requireNonNull(mapType).getDisplayName())
+ .isEqualTo("map(bigint, varchar)");
+
+ Type row =
+ PaimonTypeUtils.toPrestoType(
+ DataTypes.ROW(
+ new DataField(0, "id", new IntType()),
+ new DataField(1, "name", new VarCharType(Integer.MAX_VALUE))),
+ createTestFunctionAndTypeManager());
+ assertThat(Objects.requireNonNull(row).getDisplayName())
+ .isEqualTo("row(\"id\" integer, \"name\" varchar)");
+ }
+
+ @Test
+ public void testToPaimonType() {
+ DataType charType = PaimonTypeUtils.toPaimonType(CharType.createCharType(1));
+ assertThat(charType.asSQLString()).isEqualTo("CHAR(1)");
+
+ DataType varCharType =
+ PaimonTypeUtils.toPaimonType(VarcharType.createUnboundedVarcharType());
+ assertThat(varCharType.asSQLString()).isEqualTo("STRING");
+
+ DataType booleanType = PaimonTypeUtils.toPaimonType(BooleanType.BOOLEAN);
+ assertThat(booleanType.asSQLString()).isEqualTo("BOOLEAN");
+
+ DataType varbinaryType = PaimonTypeUtils.toPaimonType(VarbinaryType.VARBINARY);
+ assertThat(varbinaryType.asSQLString()).isEqualTo("BYTES");
+
+ DataType decimalType = PaimonTypeUtils.toPaimonType(DecimalType.createDecimalType());
+ assertThat(decimalType.asSQLString()).isEqualTo("DECIMAL(38, 0)");
+
+ DataType tinyintType = PaimonTypeUtils.toPaimonType(TinyintType.TINYINT);
+ assertThat(tinyintType.asSQLString()).isEqualTo("TINYINT");
+
+ DataType smallintType = PaimonTypeUtils.toPaimonType(SmallintType.SMALLINT);
+ assertThat(smallintType.asSQLString()).isEqualTo("SMALLINT");
+
+ DataType intType = PaimonTypeUtils.toPaimonType(IntegerType.INTEGER);
+ assertThat(intType.asSQLString()).isEqualTo("INT");
+
+ DataType bigintType = PaimonTypeUtils.toPaimonType(BigintType.BIGINT);
+ assertThat(bigintType.asSQLString()).isEqualTo("BIGINT");
+
+ DataType floatType = PaimonTypeUtils.toPaimonType(RealType.REAL);
+ assertThat(floatType.asSQLString()).isEqualTo("FLOAT");
+
+ DataType doubleType = PaimonTypeUtils.toPaimonType(DoubleType.DOUBLE);
+ assertThat(doubleType.asSQLString()).isEqualTo("DOUBLE");
+
+ DataType dateType = PaimonTypeUtils.toPaimonType(DateType.DATE);
+ assertThat(dateType.asSQLString()).isEqualTo("DATE");
+
+ DataType timeType =
+ PaimonTypeUtils.toPaimonType(com.facebook.presto.common.type.TimeType.TIME);
+ assertThat(timeType.asSQLString()).isEqualTo("TIME(0)");
+
+ DataType timestampType = PaimonTypeUtils.toPaimonType(TimestampType.TIMESTAMP);
+ assertThat(timestampType.asSQLString()).isEqualTo("TIMESTAMP(6)");
+
+ DataType timestampWithTimeZoneType =
+ PaimonTypeUtils.toPaimonType(TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE);
+ assertThat(timestampWithTimeZoneType.asSQLString())
+ .isEqualTo("TIMESTAMP(6) WITH LOCAL TIME ZONE");
+
+ DataType arrayType = PaimonTypeUtils.toPaimonType(new ArrayType(IntegerType.INTEGER));
+ assertThat(arrayType.asSQLString()).isEqualTo("ARRAY");
+
+ FunctionAndTypeManager testFunctionAndTypeManager = createTestFunctionAndTypeManager();
+ Type parameterizedType =
+ testFunctionAndTypeManager.getParameterizedType(
+ StandardTypes.MAP,
+ ImmutableList.of(
+ TypeSignatureParameter.of(BigintType.BIGINT.getTypeSignature()),
+ TypeSignatureParameter.of(
+ VarcharType.createUnboundedVarcharType()
+ .getTypeSignature())));
+ DataType mapType = PaimonTypeUtils.toPaimonType(parameterizedType);
+ assertThat(mapType.asSQLString()).isEqualTo("MAP");
+
+ List fields = new ArrayList<>();
+ fields.add(new RowType.Field(java.util.Optional.of("id"), IntegerType.INTEGER));
+ fields.add(
+ new RowType.Field(
+ java.util.Optional.of("name"), VarcharType.createUnboundedVarcharType()));
+ Type type = RowType.from(fields);
+ DataType rowType = PaimonTypeUtils.toPaimonType(type);
+ assertThat(rowType.asSQLString()).isEqualTo("ROW<`id` INT, `name` STRING>");
+ }
+}
diff --git a/presto-paimon/src/test/java/com/facebook/presto/paimon/SimpleTableTestHelper.java b/presto-paimon/src/test/java/com/facebook/presto/paimon/SimpleTableTestHelper.java
new file mode 100644
index 0000000000000..982df248afa2a
--- /dev/null
+++ b/presto-paimon/src/test/java/com/facebook/presto/paimon/SimpleTableTestHelper.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.InnerTableCommit;
+import org.apache.paimon.table.sink.InnerTableWrite;
+import org.apache.paimon.types.RowType;
+
+import java.util.Collections;
+import java.util.HashMap;
+
+/** A simple table test helper to write and commit. */
+public class SimpleTableTestHelper {
+
+ private final InnerTableWrite writer;
+ private final InnerTableCommit commit;
+
+ public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
+ new SchemaManager(LocalFileIO.create(), path)
+ .createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap() {
+ {
+ put("write-mode", "change-log");
+ }
+ },
+ ""));
+ FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), path);
+ String user = "user";
+ this.writer = table.newWrite(user);
+ this.commit = table.newCommit(user);
+ }
+
+ public void write(InternalRow row) throws Exception {
+ writer.write(row);
+ }
+
+ public void commit() throws Exception {
+ commit.commit(0, writer.prepareCommit(true, 0));
+ }
+}
diff --git a/presto-paimon/src/test/java/com/facebook/presto/paimon/TestPrestoUtils.java b/presto-paimon/src/test/java/com/facebook/presto/paimon/TestPrestoUtils.java
new file mode 100644
index 0000000000000..50572c9b8b718
--- /dev/null
+++ b/presto-paimon/src/test/java/com/facebook/presto/paimon/TestPrestoUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.paimon;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.paimon.data.BinaryString.fromString;
+
+/** paimon test util. */
+public class TestPrestoUtils {
+
+ public static byte[] getSerializedTable() throws Exception {
+ String warehouse =
+ Files.createTempDirectory(UUID.randomUUID().toString()).toUri().toString();
+ Path tablePath = new Path(warehouse, "test.db/user");
+ SimpleTableTestHelper testHelper = createTestHelper(tablePath);
+ testHelper.write(GenericRow.of(1, 2L, fromString("1"), fromString("1")));
+ testHelper.write(GenericRow.of(3, 4L, fromString("2"), fromString("2")));
+ testHelper.write(GenericRow.of(5, 6L, fromString("3"), fromString("3")));
+ testHelper.write(
+ GenericRow.ofKind(RowKind.DELETE, 3, 4L, fromString("2"), fromString("2")));
+ testHelper.commit();
+ Map config = new HashMap<>();
+ config.put("warehouse", warehouse);
+ Catalog catalog =
+ CatalogFactory.createCatalog(CatalogContext.create(Options.fromMap(config)));
+ Identifier tablePath2 = new Identifier("test", "user");
+ return InstantiationUtil.serializeObject(catalog.getTable(tablePath2));
+ }
+
+ private static SimpleTableTestHelper createTestHelper(Path tablePath) throws Exception {
+ RowType rowType =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "a", new IntType()),
+ new DataField(1, "b", new BigIntType()),
+ // test field name has upper case
+ new DataField(2, "aCa", new VarCharType()),
+ new DataField(3, "d", new CharType(1))));
+ return new SimpleTableTestHelper(tablePath, rowType);
+ }
+}