From 2a8b39779667876796db06b80d30260baab63d6e Mon Sep 17 00:00:00 2001 From: Jianjian Date: Tue, 14 Nov 2023 14:32:55 -0800 Subject: [PATCH] add skeleton --- pom.xml | 20 ++ presto-paimon/pom.xml | 231 +++++++++++++ .../presto/paimon/ClassLoaderUtils.java | 35 ++ .../facebook/presto/paimon/EncodingUtils.java | 51 +++ .../presto/paimon/FieldNameUtils.java | 36 ++ .../presto/paimon/PaimonColumnHandle.java | 111 ++++++ .../facebook/presto/paimon/PaimonConfig.java | 59 ++++ .../presto/paimon/PaimonConnector.java | 97 ++++++ .../presto/paimon/PaimonConnectorFactory.java | 83 +++++ .../presto/paimon/PaimonConnectorId.java | 55 +++ .../presto/paimon/PaimonFilterConverter.java | 244 +++++++++++++ .../presto/paimon/PaimonHandleResolver.java | 55 +++ .../presto/paimon/PaimonMetadata.java | 322 ++++++++++++++++++ .../facebook/presto/paimon/PaimonModule.java | 76 +++++ .../presto/paimon/PaimonPageSource.java | 318 +++++++++++++++++ .../paimon/PaimonPageSourceProvider.java | 86 +++++ .../facebook/presto/paimon/PaimonPlugin.java | 32 ++ .../facebook/presto/paimon/PaimonSplit.java | 72 ++++ .../presto/paimon/PaimonSplitManager.java | 53 +++ .../presto/paimon/PaimonSplitSource.java | 61 ++++ .../presto/paimon/PaimonTableHandle.java | 165 +++++++++ .../paimon/PaimonTableLayoutHandle.java | 81 +++++ .../paimon/PaimonTransactionHandle.java | 68 ++++ .../paimon/PaimonTransactionManager.java | 50 +++ .../presto/paimon/PaimonTypeUtils.java | 209 ++++++++++++ .../presto/paimon/PaimonColumnHandleTest.java | 67 ++++ .../presto/paimon/PaimonPluginTest.java | 51 +++ .../presto/paimon/PaimonQueryRunner.java | 101 ++++++ .../presto/paimon/PaimonSplitTest.java | 41 +++ .../presto/paimon/PaimonTableHandleTest.java | 53 +++ .../presto/paimon/PaimonTypeTest.java | 236 +++++++++++++ .../presto/paimon/SimpleTableTestHelper.java | 67 ++++ .../presto/paimon/TestPrestoUtils.java | 78 +++++ 33 files changed, 3364 insertions(+) create mode 100644 presto-paimon/pom.xml create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/ClassLoaderUtils.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/EncodingUtils.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/FieldNameUtils.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonColumnHandle.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConfig.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnector.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnectorFactory.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnectorId.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonFilterConverter.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonHandleResolver.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonMetadata.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonModule.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPageSource.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPageSourceProvider.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPlugin.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplit.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplitManager.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplitSource.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTableHandle.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTableLayoutHandle.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTransactionHandle.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTransactionManager.java create mode 100644 presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTypeUtils.java create mode 100644 presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonColumnHandleTest.java create mode 100644 presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonPluginTest.java create mode 100644 presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonQueryRunner.java create mode 100644 presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonSplitTest.java create mode 100644 presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonTableHandleTest.java create mode 100644 presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonTypeTest.java create mode 100644 presto-paimon/src/test/java/com/facebook/presto/paimon/SimpleTableTestHelper.java create mode 100644 presto-paimon/src/test/java/com/facebook/presto/paimon/TestPrestoUtils.java diff --git a/pom.xml b/pom.xml index a5d1cd4232efe..0905fa835c5f7 100644 --- a/pom.xml +++ b/pom.xml @@ -71,6 +71,7 @@ 0.19.0 2.3.1 0.13.1 + 0.5.0-incubating 1.18.3 3.3.0 2.6.0 @@ -191,6 +192,7 @@ redis-hbo-provider presto-singlestore presto-hana + presto-paimon @@ -1333,6 +1335,24 @@ + + + org.apache.paimon + paimon-bundle + ${dep.paimon.version} + + + org.slf4j + slf4j-api + + + com.google.guava + guava + + + + + net.sf.opencsv diff --git a/presto-paimon/pom.xml b/presto-paimon/pom.xml new file mode 100644 index 0000000000000..991667cf69839 --- /dev/null +++ b/presto-paimon/pom.xml @@ -0,0 +1,231 @@ + + + 4.0.0 + + presto-root + com.facebook.presto + 0.285-SNAPSHOT + + presto-paimon + Presto - Paimon Connector + presto-plugin + + + ${project.parent.basedir} + + + + + com.google.guava + guava + + + org.codehaus.mojo + animal-sniffer-annotations + + + compile + + + + com.google.inject + guice + + + + io.airlift + aircompressor + + + + javax.inject + javax.inject + + + + org.weakref + jmxutils + + + + com.facebook.airlift + bootstrap + + + + com.facebook.airlift + concurrent + + + + com.facebook.airlift + configuration + + + + com.facebook.airlift + event + + + + com.facebook.airlift + json + + + + com.facebook.airlift + log + + + + com.facebook.presto + presto-cache + + + + com.facebook.presto + presto-memory-context + + + + com.facebook.presto + presto-plugin-toolkit + + + + com.facebook.presto + presto-parquet + + + + com.facebook.presto + presto-hive-common + + + + com.facebook.presto + presto-hive-metastore + + + + com.facebook.presto + presto-hive + + + + com.facebook.presto.hadoop + hadoop-apache2 + + + + com.facebook.presto.hive + hive-apache + + + + javax.validation + validation-api + + + + org.apache.paimon + paimon-bundle + + + + + org.jetbrains + annotations + test + + + + org.testng + testng + test + + + + com.facebook.presto + presto-tests + test + + + + com.facebook.presto + presto-tpch + test + + + + com.facebook.presto + presto-main + test + + + + com.facebook.presto + presto-hive + test-jar + test + + + + com.facebook.presto + presto-hive-metastore + test-jar + test + + + + + com.facebook.airlift + log-manager + runtime + + + + + com.facebook.presto + presto-spi + provided + + + + com.facebook.presto + presto-common + provided + + + + io.airlift + slice + provided + + + + com.facebook.drift + drift-api + provided + + + + com.fasterxml.jackson.core + jackson-annotations + 2.10.2 + provided + + + + org.openjdk.jol + jol-core + provided + + + + io.airlift + units + provided + + + diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/ClassLoaderUtils.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/ClassLoaderUtils.java new file mode 100644 index 0000000000000..2c5a381d1b45c --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/ClassLoaderUtils.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import java.util.function.Supplier; + +/** Utils for {@link ClassLoader}. */ +public class ClassLoaderUtils { + + public static T runWithContextClassLoader(Supplier supplier, ClassLoader classLoader) { + ClassLoader previous = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(classLoader); + try { + return supplier.get(); + } finally { + Thread.currentThread().setContextClassLoader(previous); + } + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/EncodingUtils.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/EncodingUtils.java new file mode 100644 index 0000000000000..eee3d566ebee6 --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/EncodingUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import org.apache.paimon.utils.InstantiationUtil; + +import java.util.Base64; + +/** Utils for encoding. */ +public class EncodingUtils { + + private static final Base64.Encoder BASE64_ENCODER = Base64.getUrlEncoder().withoutPadding(); + + private static final Base64.Decoder BASE64_DECODER = Base64.getUrlDecoder(); + + public static String encodeObjectToString(T t) { + try { + byte[] bytes = InstantiationUtil.serializeObject(t); + return new String(BASE64_ENCODER.encode(bytes), UTF_8); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static T decodeStringToObject(String encodedStr) { + final byte[] bytes = BASE64_DECODER.decode(encodedStr.getBytes(UTF_8)); + try { + return InstantiationUtil.deserializeObject(bytes, EncodingUtils.class.getClassLoader()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/FieldNameUtils.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/FieldNameUtils.java new file mode 100644 index 0000000000000..ff6e8a4e20312 --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/FieldNameUtils.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; + +import java.util.List; +import java.util.stream.Collectors; + +/** Utils for fieldName. */ +public class FieldNameUtils { + + public static List fieldNames(RowType rowType) { + return rowType.getFields().stream() + .map(DataField::name) + .map(String::toLowerCase) + .collect(Collectors.toList()); + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonColumnHandle.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonColumnHandle.java new file mode 100644 index 0000000000000..ef74cdefd3ad1 --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonColumnHandle.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import static java.util.Objects.requireNonNull; + +import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.paimon.types.DataType; +import org.apache.paimon.utils.JsonSerdeUtil; + +/** Presto {@link ColumnHandle}. */ +public class PaimonColumnHandle implements ColumnHandle { + + private final String columnName; + private final String typeString; + private final Type prestoType; + + @JsonCreator + public PaimonColumnHandle( + @JsonProperty("columnName") String columnName, + @JsonProperty("typeString") String typeString, + @JsonProperty("prestoType") Type prestoType) { + this.columnName = requireNonNull(columnName, "columnName is null"); + this.typeString = requireNonNull(typeString, "typeString is null"); + this.prestoType = requireNonNull(prestoType, "columnType is null"); + } + + public static PaimonColumnHandle create( + String columnName, DataType columnType, TypeManager typeManager) { + return new PaimonColumnHandle( + columnName, + JsonSerdeUtil.toJson(columnType), + PaimonTypeUtils.toPrestoType(columnType, typeManager)); + } + + @JsonProperty + public String getColumnName() { + return columnName; + } + + @JsonProperty + public String getTypeString() { + return typeString; + } + + public DataType paimonType() { + return JsonSerdeUtil.fromJson(typeString, DataType.class); + } + + @JsonProperty + public Type getPrestoType() { + return prestoType; + } + + public ColumnMetadata getColumnMetadata() { + return new ColumnMetadata(columnName, prestoType); + } + + @Override + public int hashCode() { + return columnName.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + + PaimonColumnHandle other = (PaimonColumnHandle) obj; + return columnName.equals(other.columnName); + } + + @Override + public String toString() { + return "{" + + "columnName='" + + columnName + + '\'' + + ", typeString='" + + typeString + + '\'' + + ", prestoType=" + + prestoType + + '}'; + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConfig.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConfig.java new file mode 100644 index 0000000000000..2566b494b3e72 --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConfig.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import com.facebook.airlift.configuration.Config; + +/** Used for configuration item inspection and management. */ +public class PaimonConfig { + + private String warehouse; + private String metastore; + private String uri; + + public String getWarehouse() { + return warehouse; + } + + @Config("warehouse") + public PaimonConfig setWarehouse(String warehouse) { + this.warehouse = warehouse; + return this; + } + + public String getMetastore() { + return metastore; + } + + @Config("metastore") + public PaimonConfig setMetastore(String metastore) { + this.metastore = metastore; + return this; + } + + public String getUri() { + return uri; + } + + @Config("uri") + public PaimonConfig setUri(String uri) { + this.uri = uri; + return this; + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnector.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnector.java new file mode 100644 index 0000000000000..69aa742eb1b30 --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnector.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED; +import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports; +import static java.util.Objects.requireNonNull; + +import com.facebook.presto.spi.classloader.ThreadContextClassLoader; +import com.facebook.presto.spi.connector.Connector; +import com.facebook.presto.spi.connector.ConnectorCommitHandle; +import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; +import com.facebook.presto.spi.connector.ConnectorSplitManager; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.facebook.presto.spi.connector.EmptyConnectorCommitHandle; +import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata; +import com.facebook.presto.spi.transaction.IsolationLevel; + +import javax.inject.Inject; + +/** Presto {@link Connector}. */ +public class PaimonConnector implements Connector { + + private final PaimonTransactionManager transactionManager; + private final PaimonSplitManager paimonSplitManager; + private final PaimonPageSourceProvider paimonPageSourceProvider; + private final PaimonMetadata paimonMetadata; + + @Inject + public PaimonConnector( + PaimonTransactionManager transactionManager, + PaimonSplitManager paimonSplitManager, + PaimonPageSourceProvider paimonPageSourceProvider, + PaimonMetadata paimonMetadata) { + this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); + this.paimonSplitManager = requireNonNull(paimonSplitManager, "paimonSplitManager is null"); + this.paimonPageSourceProvider = + requireNonNull(paimonPageSourceProvider, "paimonPageSourceProvider is null"); + this.paimonMetadata = requireNonNull(paimonMetadata, "paimonMetadata is null"); + } + + @Override + public ConnectorCommitHandle commit(ConnectorTransactionHandle transaction) { + transactionManager.remove(transaction); + return EmptyConnectorCommitHandle.INSTANCE; + } + + @Override + public ConnectorTransactionHandle beginTransaction( + IsolationLevel isolationLevel, boolean readOnly) { + checkConnectorSupports(READ_COMMITTED, isolationLevel); + ConnectorTransactionHandle transaction = new PaimonTransactionHandle(); + try (ThreadContextClassLoader ignored = + new ThreadContextClassLoader(getClass().getClassLoader())) { + transactionManager.put(transaction, paimonMetadata); + } + return transaction; + } + + @Override + public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) { + ConnectorMetadata metadata = transactionManager.get(transactionHandle); + return new ClassLoaderSafeConnectorMetadata(metadata, getClass().getClassLoader()); + } + + @Override + public ConnectorSplitManager getSplitManager() { + return paimonSplitManager; + } + + @Override + public ConnectorPageSourceProvider getPageSourceProvider() { + return paimonPageSourceProvider; + } + + @Override + public void rollback(ConnectorTransactionHandle transaction) { + transactionManager.remove(transaction); + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnectorFactory.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnectorFactory.java new file mode 100644 index 0000000000000..996d3c58c0554 --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnectorFactory.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import static com.google.common.base.Throwables.throwIfUnchecked; +import static java.util.Objects.requireNonNull; + +import com.facebook.airlift.bootstrap.Bootstrap; +import com.facebook.airlift.json.JsonModule; +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.connector.Connector; +import com.facebook.presto.spi.connector.ConnectorContext; +import com.facebook.presto.spi.connector.ConnectorFactory; +import com.google.inject.Injector; + +import java.lang.reflect.Method; +import java.util.Map; + +/** Presto {@link ConnectorFactory}. */ +public class PaimonConnectorFactory implements ConnectorFactory { + + @Override + public String getName() { + return "paimon"; + } + + @Override + public ConnectorHandleResolver getHandleResolver() { + return new PaimonHandleResolver(); + } + + @Override + public Connector create( + String catalogName, Map config, ConnectorContext context) { + requireNonNull(config, "config is null"); + + try { + Bootstrap app = + new Bootstrap( + new JsonModule(), + new PaimonModule( + catalogName, + context.getTypeManager(), + context.getFunctionMetadataManager(), + context.getStandardFunctionResolution(), + context.getRowExpressionService(), + config)); + + Bootstrap bootstrap = + app.doNotInitializeLogging().setRequiredConfigurationProperties(config).quiet(); + try { + // Using reflection to achieve compatibility with different versions of + // dependencies. + Method noStrictConfigMethod = Bootstrap.class.getMethod("noStrictConfig"); + noStrictConfigMethod.invoke(bootstrap); + } catch (NoSuchMethodException e) { + // ignore + } + Injector injector = bootstrap.initialize(); + + return injector.getInstance(PaimonConnector.class); + } catch (Exception e) { + throwIfUnchecked(e); + throw new RuntimeException(e); + } + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnectorId.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnectorId.java new file mode 100644 index 0000000000000..8cc96200ead29 --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonConnectorId.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import static java.util.Objects.requireNonNull; + +import java.util.Objects; + +/** Wrap for connector id. */ +public final class PaimonConnectorId { + + private final String id; + + public PaimonConnectorId(String id) { + this.id = requireNonNull(id, "id is null"); + } + + @Override + public String toString() { + return id; + } + + @Override + public int hashCode() { + return Objects.hash(id); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + PaimonConnectorId other = (PaimonConnectorId) obj; + return Objects.equals(this.id, other.id); + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonFilterConverter.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonFilterConverter.java new file mode 100644 index 0000000000000..1ae4867f1f9ab --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonFilterConverter.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import com.facebook.presto.common.predicate.Domain; +import com.facebook.presto.common.predicate.Range; +import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.common.type.ArrayType; +import com.facebook.presto.common.type.BigintType; +import com.facebook.presto.common.type.BooleanType; +import com.facebook.presto.common.type.CharType; +import com.facebook.presto.common.type.DateType; +import com.facebook.presto.common.type.DecimalType; +import com.facebook.presto.common.type.Decimals; +import com.facebook.presto.common.type.DoubleType; +import com.facebook.presto.common.type.IntegerType; +import com.facebook.presto.common.type.MapType; +import com.facebook.presto.common.type.RealType; +import com.facebook.presto.common.type.TimeType; +import com.facebook.presto.common.type.TimestampType; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.VarbinaryType; +import com.facebook.presto.common.type.VarcharType; +import io.airlift.slice.Slice; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.shade.guava30.com.google.common.base.Preconditions; +import org.apache.paimon.types.RowType; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +/** Presto filter to Paimon predicate. */ +public class PaimonFilterConverter { + + private final RowType rowType; + private final PredicateBuilder builder; + + public PaimonFilterConverter(RowType rowType) { + this.rowType = rowType; + this.builder = new PredicateBuilder(rowType); + } + + public Optional convert(TupleDomain tupleDomain) { + if (tupleDomain.isAll()) { + return Optional.empty(); + } + + if (!tupleDomain.getDomains().isPresent()) { + return Optional.empty(); + } + + Map domainMap = tupleDomain.getDomains().get(); + List conjuncts = new ArrayList<>(); + for (Map.Entry entry : domainMap.entrySet()) { + PaimonColumnHandle columnHandle = entry.getKey(); + Domain domain = entry.getValue(); + int index = rowType.getFieldNames().indexOf(columnHandle.getColumnName()); + if (index != -1) { + try { + conjuncts.add(toPredicate(index, columnHandle.getPrestoType(), domain)); + } catch (UnsupportedOperationException ignored) { + } + } + } + + if (conjuncts.isEmpty()) { + return Optional.empty(); + } + return Optional.of(PredicateBuilder.and(conjuncts)); + } + + private Predicate toPredicate(int columnIndex, Type type, Domain domain) { + if (domain.isAll()) { + // TODO alwaysTrue + throw new UnsupportedOperationException(); + } + if (domain.getValues().isNone()) { + if (domain.isNullAllowed()) { + return builder.isNull((columnIndex)); + } + // TODO alwaysFalse + throw new UnsupportedOperationException(); + } + + if (domain.getValues().isAll()) { + if (domain.isNullAllowed()) { + // TODO alwaysTrue + throw new UnsupportedOperationException(); + } + return builder.isNotNull((columnIndex)); + } + + // TODO support structural types + if (type instanceof ArrayType + || type instanceof MapType + || type instanceof com.facebook.presto.common.type.RowType) { + // Fail fast. Ignoring expression could lead to data loss in case of deletions. + throw new UnsupportedOperationException(); + } + + if (type.isOrderable()) { + List orderedRanges = domain.getValues().getRanges().getOrderedRanges(); + List values = new ArrayList<>(); + List predicates = new ArrayList<>(); + for (Range range : orderedRanges) { + if (range.isSingleValue()) { + values.add(getLiteralValue(type, range.getLowBoundedValue())); + } else { + predicates.add(toPredicate(columnIndex, range)); + } + } + + if (!values.isEmpty()) { + predicates.add(builder.in(columnIndex, values)); + } + + if (domain.isNullAllowed()) { + predicates.add(builder.isNull(columnIndex)); + } + return PredicateBuilder.or(predicates); + } + + throw new UnsupportedOperationException(); + } + + private Predicate toPredicate(int columnIndex, Range range) { + Type type = range.getType(); + + if (range.isSingleValue()) { + Object value = getLiteralValue(type, range.getSingleValue()); + return builder.equal(columnIndex, value); + } + + List conjuncts = new ArrayList<>(2); + if (!range.isLowUnbounded()) { + Object low = getLiteralValue(type, range.getLowBoundedValue()); + Predicate lowBound; + if (range.isLowInclusive()) { + lowBound = builder.greaterOrEqual(columnIndex, low); + } else { + lowBound = builder.greaterThan(columnIndex, low); + } + conjuncts.add(lowBound); + } + + if (!range.isHighUnbounded()) { + Object high = getLiteralValue(type, range.getHighBoundedValue()); + Predicate highBound; + if (range.isHighInclusive()) { + highBound = builder.lessOrEqual(columnIndex, high); + } else { + highBound = builder.lessThan(columnIndex, high); + } + conjuncts.add(highBound); + } + + return PredicateBuilder.and(conjuncts); + } + + private Object getLiteralValue(Type type, Object prestoNativeValue) { + Objects.requireNonNull(prestoNativeValue, "prestoNativeValue is null"); + + if (type instanceof BooleanType) { + return prestoNativeValue; + } + + if (type instanceof IntegerType) { + return Math.toIntExact((long) prestoNativeValue); + } + + if (type instanceof BigintType) { + return prestoNativeValue; + } + + if (type instanceof RealType) { + return Float.intBitsToFloat(Math.toIntExact((long) prestoNativeValue)); + } + + if (type instanceof DoubleType) { + return prestoNativeValue; + } + + if (type instanceof DateType) { + return Math.toIntExact(((Long) prestoNativeValue)); + } + + if (type instanceof TimestampType || type instanceof TimeType) { + return TimeUnit.MILLISECONDS.toMicros((Long) prestoNativeValue); + } + + if (type instanceof VarcharType || type instanceof CharType) { + return BinaryString.fromBytes(((Slice) prestoNativeValue).getBytes()); + } + + if (type instanceof VarbinaryType) { + return ((Slice) prestoNativeValue).getBytes(); + } + + if (type instanceof DecimalType) { + DecimalType decimalType = (DecimalType) type; + Object value = + Objects.requireNonNull( + prestoNativeValue, "The prestoNativeValue must be non-null"); + if (Decimals.isShortDecimal(decimalType)) { + Preconditions.checkArgument( + value instanceof Long, + "A short decimal should be represented by a Long value but was %s", + value.getClass().getName()); + return BigDecimal.valueOf((long) value).movePointLeft(decimalType.getScale()); + } + Preconditions.checkArgument( + value instanceof Slice, + "A long decimal should be represented by a Slice value but was %s", + value.getClass().getName()); + return new BigDecimal( + Decimals.decodeUnscaledValue((Slice) value), decimalType.getScale()); + } + + throw new UnsupportedOperationException("Unsupported type: " + type); + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonHandleResolver.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonHandleResolver.java new file mode 100644 index 0000000000000..538ea5cd87919 --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonHandleResolver.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +/** Presto {@link ConnectorHandleResolver}. */ +public class PaimonHandleResolver implements ConnectorHandleResolver { + + @Override + public Class getTableHandleClass() { + return PaimonTableHandle.class; + } + + @Override + public Class getTableLayoutHandleClass() { + return PaimonTableLayoutHandle.class; + } + + @Override + public Class getColumnHandleClass() { + return PaimonColumnHandle.class; + } + + @Override + public Class getSplitClass() { + return PaimonSplit.class; + } + + @Override + public Class getTransactionHandleClass() { + return PaimonTransactionHandle.class; + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonMetadata.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonMetadata.java new file mode 100644 index 0000000000000..09d6e712d6fc9 --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonMetadata.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorNewTableLayout; +import com.facebook.presto.spi.ConnectorOutputTableHandle; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayout; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.ConnectorTableLayoutResult; +import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.Constraint; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.SchemaTablePrefix; +import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.facebook.presto.spi.connector.ConnectorOutputMetadata; +import com.facebook.presto.spi.statistics.ComputedStatistics; +import io.airlift.slice.Slice; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.security.SecurityContext; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; +import org.apache.paimon.utils.InstantiationUtil; +import org.apache.paimon.utils.StringUtils; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; +import javax.inject.Inject; + +/** Presto {@link ConnectorMetadata}. */ +public class PaimonMetadata implements ConnectorMetadata { + + private final Catalog catalog; + private final TypeManager typeManager; + + @Inject + public PaimonMetadata(Options catalogOptions, TypeManager typeManager) { + try { + SecurityContext.install(catalogOptions); + } catch (Exception e) { + throw new RuntimeException(e); + } + this.catalog = CatalogFactory.createCatalog(CatalogContext.create(catalogOptions)); + this.typeManager = typeManager; + } + + @Override + public List listSchemaNames(ConnectorSession session) { + return listSchemaNames(); + } + + private List listSchemaNames() { + return catalog.listDatabases(); + } + + @Override + public boolean schemaExists(ConnectorSession session, String schemaName) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(schemaName), + "schemaName cannot be null or empty"); + return catalog.databaseExists(schemaName); + } + + @Override + public void createSchema( + ConnectorSession session, String schemaName, Map properties) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(schemaName), + "schemaName cannot be null or empty"); + try { + catalog.createDatabase(schemaName, true); + } catch (Catalog.DatabaseAlreadyExistException e) { + throw new RuntimeException(e); + } + } + + @Override + public void dropSchema(ConnectorSession session, String schemaName) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(schemaName), + "schemaName cannot be null or empty"); + try { + catalog.dropDatabase(schemaName, true, true); + } catch (Catalog.DatabaseNotExistException | Catalog.DatabaseNotEmptyException e) { + throw new RuntimeException(e); + } + } + + @Override + public PaimonTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { + return getTableHandle(tableName); + } + + public PaimonTableHandle getTableHandle(SchemaTableName tableName) { + Identifier tablePath = new Identifier(tableName.getSchemaName(), tableName.getTableName()); + byte[] serializedTable; + try { + serializedTable = InstantiationUtil.serializeObject(catalog.getTable(tablePath)); + } catch (Catalog.TableNotExistException e) { + return null; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return new PaimonTableHandle( + tableName.getSchemaName(), tableName.getTableName(), serializedTable); + } + + @Override + public List getTableLayouts( + ConnectorSession session, + ConnectorTableHandle table, + Constraint constraint, + Optional> desiredColumns) { + PaimonTableHandle handle = (PaimonTableHandle) table; + ConnectorTableLayout layout = + new ConnectorTableLayout( + new PaimonTableLayoutHandle(handle, constraint.getSummary())); + return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); + } + + @Override + public ConnectorTableLayout getTableLayout( + ConnectorSession session, ConnectorTableLayoutHandle handle) { + return new ConnectorTableLayout(handle); + } + + @Override + public ConnectorTableMetadata getTableMetadata( + ConnectorSession session, ConnectorTableHandle table) { + return ((PaimonTableHandle) table).tableMetadata(typeManager); + } + + @Override + public Map getColumnHandles( + ConnectorSession session, ConnectorTableHandle tableHandle) { + PaimonTableHandle table = (PaimonTableHandle) tableHandle; + Map handleMap = new HashMap<>(); + for (ColumnMetadata column : table.columnMetadatas(typeManager)) { + handleMap.put(column.getName(), table.columnHandle(column.getName(), typeManager)); + } + return handleMap; + } + + @Override + public ColumnMetadata getColumnMetadata( + ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) { + return ((PaimonColumnHandle) columnHandle).getColumnMetadata(); + } + + @Override + public List listTables(ConnectorSession session, Optional schemaName) { + List tables = new ArrayList<>(); + schemaName + .map(Collections::singletonList) + .orElseGet(catalog::listDatabases) + .forEach(schema -> tables.addAll(listTables(schema))); + return tables; + } + + private List listTables(String schema) { + try { + return catalog.listTables(schema).stream() + .map(table -> new SchemaTableName(schema, table)) + .collect(toList()); + } catch (Catalog.DatabaseNotExistException e) { + throw new RuntimeException(e); + } + } + + // TODO: Need to insert method,the pr of presto writer will do this + @Override + public ConnectorOutputTableHandle beginCreateTable( + ConnectorSession session, + ConnectorTableMetadata tableMetadata, + Optional layout) { + return null; + } + + @Override + public void createTable( + ConnectorSession session, + ConnectorTableMetadata tableMetadata, + boolean ignoreExisting) { + SchemaTableName table = tableMetadata.getTable(); + Identifier identifier = Identifier.create(table.getSchemaName(), table.getTableName()); + + try { + catalog.createTable(identifier, prepareSchema(tableMetadata), true); + } catch (Catalog.TableAlreadyExistException e) { + throw new RuntimeException(format("table already existed: '%s'", table.getTableName())); + } catch (Catalog.DatabaseNotExistException e) { + throw new RuntimeException(format("database not exists: '%s'", table.getSchemaName())); + } + } + + // TODO + @Override + public Optional finishCreateTable( + ConnectorSession session, + ConnectorOutputTableHandle tableHandle, + Collection fragments, + Collection computedStatistics) { + return Optional.empty(); + } + + // TODO: options is not set + private Schema prepareSchema(ConnectorTableMetadata tableMetadata) { + Schema.Builder builder = + Schema.newBuilder() + .primaryKey(getPrimaryKeys(tableMetadata.getProperties())) + .partitionKeys(getPartitionedKeys(tableMetadata.getProperties())); + + for (ColumnMetadata column : tableMetadata.getColumns()) { + builder.column( + column.getName(), + PaimonTypeUtils.toPaimonType(column.getType()), + column.getComment()); + } + return builder.build(); + } + + private List getPartitionedKeys(Map tableProperties) { + List partitionedKeys = + (List) tableProperties.get(CoreOptions.PARTITION.key()); + return partitionedKeys == null ? ImmutableList.of() : ImmutableList.copyOf(partitionedKeys); + } + + private List getPrimaryKeys(Map tableProperties) { + List primaryKeys = + (List) tableProperties.get(CoreOptions.PRIMARY_KEY.key()); + return primaryKeys == null ? ImmutableList.of() : ImmutableList.copyOf(primaryKeys); + } + + @Override + public void renameTable( + ConnectorSession session, + ConnectorTableHandle tableHandle, + SchemaTableName newTableName) { + PaimonTableHandle oldTableHandle = (PaimonTableHandle) tableHandle; + try { + catalog.renameTable( + new Identifier(oldTableHandle.getSchemaName(), oldTableHandle.getTableName()), + new Identifier(newTableName.getSchemaName(), newTableName.getTableName()), + true); + } catch (Catalog.TableNotExistException | Catalog.TableAlreadyExistException e) { + throw new RuntimeException(e); + } + } + + @Override + public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) { + PaimonTableHandle paimonTableHandle = (PaimonTableHandle) tableHandle; + try { + catalog.dropTable( + new Identifier( + paimonTableHandle.getSchemaName(), paimonTableHandle.getTableName()), + true); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + } + + @Override + public Map> listTableColumns( + ConnectorSession session, SchemaTablePrefix prefix) { + requireNonNull(prefix, "prefix is null"); + List tableNames; + if (prefix.getTableName() != null) { + tableNames = Collections.singletonList(prefix.toSchemaTableName()); + } else { + tableNames = listTables(session, Optional.of(prefix.getSchemaName())); + } + + return tableNames.stream() + .collect( + toMap( + Function.identity(), + table -> + getTableHandle(session, table) + .columnMetadatas(typeManager))); + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonModule.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonModule.java new file mode 100644 index 0000000000000..51d8733073b7c --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonModule.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import static com.facebook.airlift.configuration.ConfigBinder.configBinder; +import static java.util.Objects.requireNonNull; + +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.spi.function.FunctionMetadataManager; +import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.relation.RowExpressionService; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Scopes; +import org.apache.paimon.options.Options; + +import java.util.Map; + +/** Module for binding instance. */ +public class PaimonModule implements Module { + + private final String connectorId; + private final TypeManager typeManager; + private final FunctionMetadataManager functionMetadataManager; + private final StandardFunctionResolution standardFunctionResolution; + private final RowExpressionService rowExpressionService; + private final Map config; + + public PaimonModule( + String connectorId, + TypeManager typeManager, + FunctionMetadataManager functionMetadataManager, + StandardFunctionResolution standardFunctionResolution, + RowExpressionService rowExpressionService, + Map config) { + this.connectorId = requireNonNull(connectorId, "catalogName is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.functionMetadataManager = functionMetadataManager; + this.standardFunctionResolution = standardFunctionResolution; + this.rowExpressionService = rowExpressionService; + this.config = config; + } + + @Override + public void configure(Binder binder) { + binder.bind(PaimonConnectorId.class).toInstance(new PaimonConnectorId(connectorId)); + binder.bind(TypeManager.class).toInstance(typeManager); + binder.bind(PaimonConnector.class).in(Scopes.SINGLETON); + binder.bind(PaimonMetadata.class).in(Scopes.SINGLETON); + binder.bind(PaimonSplitManager.class).in(Scopes.SINGLETON); + binder.bind(PaimonPageSourceProvider.class).in(Scopes.SINGLETON); + binder.bind(FunctionMetadataManager.class).toInstance(functionMetadataManager); + binder.bind(StandardFunctionResolution.class).toInstance(standardFunctionResolution); + binder.bind(RowExpressionService.class).toInstance(rowExpressionService); + binder.bind(Options.class).toInstance(Options.fromMap(config)); + binder.bind(PaimonTransactionManager.class).in(Scopes.SINGLETON); + + configBinder(binder).bindConfig(PaimonConfig.class); + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPageSource.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPageSource.java new file mode 100644 index 0000000000000..908086e1b1c4c --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPageSource.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.common.type.DateType.DATE; +import static com.facebook.presto.common.type.Decimals.encodeShortScaledValue; +import static com.facebook.presto.common.type.Decimals.isLongDecimal; +import static com.facebook.presto.common.type.Decimals.isShortDecimal; +import static com.facebook.presto.common.type.IntegerType.INTEGER; +import static com.facebook.presto.common.type.RealType.REAL; +import static com.facebook.presto.common.type.SmallintType.SMALLINT; +import static com.facebook.presto.common.type.TimeType.TIME; +import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; +import static com.facebook.presto.common.type.TinyintType.TINYINT; +import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; +import static io.airlift.slice.Slices.wrappedBuffer; +import static java.lang.String.format; + +import com.facebook.presto.common.Page; +import com.facebook.presto.common.PageBuilder; +import com.facebook.presto.common.block.Block; +import com.facebook.presto.common.block.BlockBuilder; +import com.facebook.presto.common.type.ArrayType; +import com.facebook.presto.common.type.CharType; +import com.facebook.presto.common.type.DecimalType; +import com.facebook.presto.common.type.Decimals; +import com.facebook.presto.common.type.MapType; +import com.facebook.presto.common.type.RowType; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.VarbinaryType; +import com.facebook.presto.common.type.VarcharType; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.PrestoException; +import io.airlift.slice.Slice; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.shade.guava30.com.google.common.base.Verify; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeChecks; +import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.InternalRowUtils; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.math.BigDecimal; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; + +/** Presto {@link ConnectorPageSource}. */ +public class PaimonPageSource implements ConnectorPageSource { + + private static final int ROWS_PER_REQUEST = 4096; + private final CloseableIterator iterator; + private final PageBuilder pageBuilder; + private final List prestoColumnTypes; + private final List paimonColumnTypes; + private boolean isFinished = false; + private long numReturn = 0; + + public PaimonPageSource(RecordReader reader, List projectedColumns) { + this.iterator = reader.toCloseableIterator(); + this.prestoColumnTypes = new ArrayList<>(); + this.paimonColumnTypes = new ArrayList<>(); + for (ColumnHandle handle : projectedColumns) { + PaimonColumnHandle paimonColumnHandle = (PaimonColumnHandle) handle; + PaimonPageSource.this.prestoColumnTypes.add(paimonColumnHandle.getPrestoType()); + PaimonPageSource.this.paimonColumnTypes.add(paimonColumnHandle.paimonType()); + } + + this.pageBuilder = new PageBuilder(PaimonPageSource.this.prestoColumnTypes); + } + + private static void writeSlice(BlockBuilder output, Type type, Object value) { + if (type instanceof VarcharType || type instanceof CharType) { + type.writeSlice(output, wrappedBuffer(((BinaryString) value).toBytes())); + } else if (type instanceof VarbinaryType) { + type.writeSlice(output, wrappedBuffer((byte[]) value)); + } else { + throw new PrestoException( + GENERIC_INTERNAL_ERROR, "Unhandled type for Slice: " + type.getTypeSignature()); + } + } + + private static void writeObject(BlockBuilder output, Type type, Object value) { + if (type instanceof DecimalType) { + Verify.verify(isLongDecimal(type), "The type should be long decimal"); + DecimalType decimalType = (DecimalType) type; + BigDecimal decimal = ((Decimal) value).toBigDecimal(); + type.writeSlice(output, Decimals.encodeScaledValue(decimal, decimalType.getScale())); + } else { + throw new PrestoException( + GENERIC_INTERNAL_ERROR, + "Unhandled type for Object: " + type.getTypeSignature()); + } + } + + @Override + public long getCompletedBytes() { + return 0; + } + + @Override + public long getCompletedPositions() { + return 0; + } + + @Override + public long getReadTimeNanos() { + return 0; + } + + @Override + public boolean isFinished() { + return isFinished; + } + + @Override + public Page getNextPage() { + return ClassLoaderUtils.runWithContextClassLoader( + () -> { + try { + return nextPage(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }, + PaimonPageSource.class.getClassLoader()); + } + + @Override + public long getSystemMemoryUsage() { + return 0; + } + + @Nullable + private Page nextPage() throws IOException { + int count = 0; + while (count < ROWS_PER_REQUEST && !pageBuilder.isFull()) { + if (!iterator.hasNext()) { + isFinished = true; + return returnPage(count); + } + + InternalRow row = iterator.next(); + pageBuilder.declarePosition(); + count++; + for (int i = 0; i < prestoColumnTypes.size(); i++) { + BlockBuilder output = pageBuilder.getBlockBuilder(i); + appendTo( + prestoColumnTypes.get(i), + paimonColumnTypes.get(i), + InternalRowUtils.get(row, i, paimonColumnTypes.get(i)), + output); + } + } + + return returnPage(count); + } + + private Page returnPage(int count) { + if (count == 0) { + return null; + } + numReturn += count; + Page page = pageBuilder.build(); + pageBuilder.reset(); + return page; + } + + @Override + public void close() throws IOException { + try { + this.iterator.close(); + } catch (Exception e) { + throw new IOException(e); + } + } + + private void appendTo(Type prestoType, DataType paimonType, Object value, BlockBuilder output) { + if (value == null) { + output.appendNull(); + return; + } + + Class javaType = prestoType.getJavaType(); + if (javaType == boolean.class) { + prestoType.writeBoolean(output, (Boolean) value); + } else if (javaType == long.class) { + if (prestoType.equals(BIGINT) + || prestoType.equals(INTEGER) + || prestoType.equals(TINYINT) + || prestoType.equals(SMALLINT) + || prestoType.equals(DATE)) { + prestoType.writeLong(output, ((Number) value).longValue()); + } else if (prestoType.equals(REAL)) { + prestoType.writeLong(output, Float.floatToIntBits((Float) value)); + } else if (prestoType instanceof DecimalType) { + Verify.verify(isShortDecimal(prestoType), "The type should be short decimal"); + DecimalType decimalType = (DecimalType) prestoType; + BigDecimal decimal = ((Decimal) value).toBigDecimal(); + prestoType.writeLong( + output, encodeShortScaledValue(decimal, decimalType.getScale())); + } else if (prestoType.equals(TIMESTAMP)) { + prestoType.writeLong( + output, + ((Timestamp) value) + .toLocalDateTime() + .atZone(ZoneId.systemDefault()) + .toInstant() + .toEpochMilli()); + } else if (prestoType.equals(TIME)) { + prestoType.writeLong(output, (int) value * 1_000); + } else { + throw new PrestoException( + GENERIC_INTERNAL_ERROR, + format("Unhandled type for %s: %s", javaType.getSimpleName(), prestoType)); + } + } else if (javaType == double.class) { + prestoType.writeDouble(output, ((Number) value).doubleValue()); + } else if (prestoType instanceof DecimalType) { + writeObject(output, prestoType, value); + } else if (javaType == Slice.class) { + writeSlice(output, prestoType, value); + } else if (javaType == Block.class) { + writeBlock(output, prestoType, paimonType, value); + } else { + throw new PrestoException( + GENERIC_INTERNAL_ERROR, + format("Unhandled type for %s: %s", javaType.getSimpleName(), prestoType)); + } + } + + private void writeBlock( + BlockBuilder output, Type prestoType, DataType paimonType, Object value) { + if (prestoType instanceof ArrayType) { + BlockBuilder builder = output.beginBlockEntry(); + + InternalArray arrayData = (InternalArray) value; + DataType elementType = DataTypeChecks.getNestedTypes(paimonType).get(0); + for (int i = 0; i < arrayData.size(); i++) { + appendTo( + prestoType.getTypeParameters().get(0), + elementType, + InternalRowUtils.get(arrayData, i, elementType), + builder); + } + + output.closeEntry(); + return; + } + if (prestoType instanceof RowType) { + InternalRow rowData = (InternalRow) value; + BlockBuilder builder = output.beginBlockEntry(); + for (int index = 0; index < prestoType.getTypeParameters().size(); index++) { + Type fieldType = prestoType.getTypeParameters().get(index); + DataType fieldLogicalType = + ((org.apache.paimon.types.RowType) paimonType).getTypeAt(index); + appendTo( + fieldType, + fieldLogicalType, + InternalRowUtils.get(rowData, index, fieldLogicalType), + builder); + } + output.closeEntry(); + return; + } + if (prestoType instanceof MapType) { + InternalMap mapData = (InternalMap) value; + InternalArray keyArray = mapData.keyArray(); + InternalArray valueArray = mapData.valueArray(); + DataType keyType = ((org.apache.paimon.types.MapType) paimonType).getKeyType(); + DataType valueType = ((org.apache.paimon.types.MapType) paimonType).getValueType(); + BlockBuilder builder = output.beginBlockEntry(); + for (int i = 0; i < keyArray.size(); i++) { + appendTo( + prestoType.getTypeParameters().get(0), + keyType, + InternalRowUtils.get(keyArray, i, keyType), + builder); + appendTo( + prestoType.getTypeParameters().get(1), + valueType, + InternalRowUtils.get(valueArray, i, valueType), + builder); + } + output.closeEntry(); + return; + } + throw new PrestoException( + GENERIC_INTERNAL_ERROR, + "Unhandled type for Block: " + prestoType.getTypeSignature()); + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPageSourceProvider.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPageSourceProvider.java new file mode 100644 index 0000000000000..9c55f9bc1b61f --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPageSourceProvider.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + + +import static com.facebook.presto.paimon.ClassLoaderUtils.runWithContextClassLoader; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorPageSource; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.SplitContext; +import com.facebook.presto.spi.connector.ConnectorPageSourceProvider; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.RowType; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +/** Presto {@link ConnectorPageSourceProvider}. */ +public class PaimonPageSourceProvider implements ConnectorPageSourceProvider { + + @Override + public ConnectorPageSource createPageSource( + ConnectorTransactionHandle transactionHandle, + ConnectorSession session, + ConnectorSplit split, + ConnectorTableLayoutHandle layout, + List columns, + SplitContext splitContext) { + return runWithContextClassLoader( + () -> + createPageSource( + ((PaimonTableLayoutHandle) layout).getTableHandle(), + (PaimonSplit) split, + columns), + PaimonPageSourceProvider.class.getClassLoader()); + } + + private ConnectorPageSource createPageSource( + PaimonTableHandle tableHandle, PaimonSplit split, List columns) { + Table table = tableHandle.table(); + ReadBuilder read = table.newReadBuilder(); + RowType rowType = table.rowType(); + List fieldNames = FieldNameUtils.fieldNames(rowType); + List projectedFields = + columns.stream() + .map(PaimonColumnHandle.class::cast) + .map(PaimonColumnHandle::getColumnName) + .collect(Collectors.toList()); + if (!fieldNames.equals(projectedFields)) { + int[] projected = projectedFields.stream().mapToInt(fieldNames::indexOf).toArray(); + read.withProjection(projected); + } + + new PaimonFilterConverter(rowType) + .convert(tableHandle.getFilter()) + .ifPresent(read::withFilter); + + try { + return new PaimonPageSource(read.newRead().createReader(split.decodeSplit()), columns); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPlugin.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPlugin.java new file mode 100644 index 0000000000000..fbffc45e73e71 --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonPlugin.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import com.facebook.presto.spi.Plugin; +import com.facebook.presto.spi.connector.ConnectorFactory; + +import java.util.Collections; + +/** Presto {@link Plugin}. */ +public class PaimonPlugin implements Plugin { + @Override + public Iterable getConnectorFactories() { + return Collections.singletonList(new PaimonConnectorFactory()); + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplit.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplit.java new file mode 100644 index 0000000000000..c010578609adc --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplit.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE; + +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.HostAddress; +import com.facebook.presto.spi.NodeProvider; +import com.facebook.presto.spi.schedule.NodeSelectionStrategy; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; +import org.apache.paimon.table.source.Split; + +import java.util.Collections; +import java.util.List; + +/** Presto {@link ConnectorSplit}. */ +public class PaimonSplit implements ConnectorSplit { + + private final String splitSerialized; + + @JsonCreator + public PaimonSplit(@JsonProperty("splitSerialized") String splitSerialized) { + this.splitSerialized = splitSerialized; + } + + public static PaimonSplit fromSplit(Split split) { + return new PaimonSplit(EncodingUtils.encodeObjectToString(split)); + } + + @Override + public List getPreferredNodes(NodeProvider nodeProvider) { + return ImmutableList.of(); + } + + public Split decodeSplit() { + return EncodingUtils.decodeStringToObject(splitSerialized); + } + + @JsonProperty + public String getSplitSerialized() { + return splitSerialized; + } + + @Override + public NodeSelectionStrategy getNodeSelectionStrategy() { + return NO_PREFERENCE; + } + + @Override + public Object getInfo() { + return Collections.emptyMap(); + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplitManager.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplitManager.java new file mode 100644 index 0000000000000..5f802fac36e86 --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplitManager.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.connector.ConnectorSplitManager; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.Split; + +import java.util.List; +import java.util.stream.Collectors; + +/** Presto {@link ConnectorSplitManager}. */ +public class PaimonSplitManager implements ConnectorSplitManager { + + @Override + public ConnectorSplitSource getSplits( + ConnectorTransactionHandle transactionHandle, + ConnectorSession session, + ConnectorTableLayoutHandle layout, + SplitSchedulingContext splitSchedulingContext) { + + PaimonTableHandle tableHandle = ((PaimonTableLayoutHandle) layout).getTableHandle(); + Table table = tableHandle.table(); + ReadBuilder readBuilder = table.newReadBuilder(); + new PaimonFilterConverter(table.rowType()) + .convert(tableHandle.getFilter()) + .ifPresent(readBuilder::withFilter); + List splits = readBuilder.newScan().plan().splits(); + return new PaimonSplitSource( + splits.stream().map(PaimonSplit::fromSplit).collect(Collectors.toList())); + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplitSource.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplitSource.java new file mode 100644 index 0000000000000..9690c15cd3737 --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonSplitSource.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.connector.ConnectorPartitionHandle; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; + +/** Presto {@link ConnectorSplitSource}. */ +public class PaimonSplitSource implements ConnectorSplitSource { + + private final Queue splits; + + public PaimonSplitSource(List splits) { + this.splits = new LinkedList<>(splits); + } + + @Override + public CompletableFuture getNextBatch( + ConnectorPartitionHandle partitionHandle, int maxSize) { + List batch = new ArrayList<>(); + for (int i = 0; i < maxSize; i++) { + PaimonSplit split = splits.poll(); + if (split == null) { + break; + } + batch.add(split); + } + return CompletableFuture.completedFuture(new ConnectorSplitBatch(batch, isFinished())); + } + + @Override + public void close() {} + + @Override + public boolean isFinished() { + return splits.isEmpty(); + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTableHandle.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTableHandle.java new file mode 100644 index 0000000000000..a41b71e724d03 --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTableHandle.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.SchemaTableName; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.paimon.table.Table; +import org.apache.paimon.utils.InstantiationUtil; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +/** Presto {@link ConnectorTableHandle}. */ +public class PaimonTableHandle implements ConnectorTableHandle { + + private final String schemaName; + private final String tableName; + private final byte[] serializedTable; + private final TupleDomain filter; + private final Optional> projectedColumns; + + private Table lazyTable; + + public PaimonTableHandle(String schemaName, String tableName, byte[] serializedTable) { + this(schemaName, tableName, serializedTable, TupleDomain.all(), Optional.empty()); + } + + @JsonCreator + public PaimonTableHandle( + @JsonProperty("schemaName") String schemaName, + @JsonProperty("tableName") String tableName, + @JsonProperty("serializedTable") byte[] serializedTable, + @JsonProperty("filter") TupleDomain filter, + @JsonProperty("projection") Optional> projectedColumns) { + this.schemaName = schemaName; + this.tableName = tableName; + this.serializedTable = serializedTable; + this.filter = filter; + this.projectedColumns = projectedColumns; + } + + @JsonProperty + public String getSchemaName() { + return schemaName; + } + + @JsonProperty + public String getTableName() { + return tableName; + } + + @JsonProperty + public byte[] getSerializedTable() { + return serializedTable; + } + + @JsonProperty + public TupleDomain getFilter() { + return filter; + } + + @JsonProperty + public Optional> getProjectedColumns() { + return projectedColumns; + } + + public PaimonTableHandle copy(TupleDomain filter) { + return new PaimonTableHandle( + schemaName, tableName, serializedTable, filter, projectedColumns); + } + + public PaimonTableHandle copy(Optional> projectedColumns) { + return new PaimonTableHandle( + schemaName, tableName, serializedTable, filter, projectedColumns); + } + + public Table table() { + if (lazyTable == null) { + try { + lazyTable = + InstantiationUtil.deserializeObject( + serializedTable, this.getClass().getClassLoader()); + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + return lazyTable; + } + + public ConnectorTableMetadata tableMetadata(TypeManager typeManager) { + return new ConnectorTableMetadata( + new SchemaTableName(schemaName, tableName), columnMetadatas(typeManager)); + } + + public List columnMetadatas(TypeManager typeManager) { + return table().rowType().getFields().stream() + .map( + column -> + new ColumnMetadata( + column.name(), + Objects.requireNonNull( + PaimonTypeUtils.toPrestoType( + column.type(), typeManager)))) + .collect(Collectors.toList()); + } + + public PaimonColumnHandle columnHandle(String field, TypeManager typeManager) { + List fieldNames = FieldNameUtils.fieldNames(table().rowType()); + int index = fieldNames.indexOf(field); + if (index == -1) { + throw new RuntimeException( + String.format("Cannot find field %s in schema %s", field, fieldNames)); + } + return PaimonColumnHandle.create(field, table().rowType().getTypeAt(index), typeManager); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PaimonTableHandle that = (PaimonTableHandle) o; + return Arrays.equals(serializedTable, that.serializedTable) + && Objects.equals(schemaName, that.schemaName) + && Objects.equals(tableName, that.tableName) + && Objects.equals(filter, that.filter) + && Objects.equals(projectedColumns, that.projectedColumns); + } + + @Override + public int hashCode() { + return Objects.hash( + schemaName, tableName, filter, projectedColumns, Arrays.hashCode(serializedTable)); + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTableLayoutHandle.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTableLayoutHandle.java new file mode 100644 index 0000000000000..50e14a0aed1ab --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTableLayoutHandle.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import com.facebook.presto.common.predicate.TupleDomain; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.paimon.shade.guava30.com.google.common.base.MoreObjects; + +import java.util.Objects; + +/** Presto {@link ConnectorTableLayoutHandle}. */ +public class PaimonTableLayoutHandle implements ConnectorTableLayoutHandle { + + private final PaimonTableHandle tableHandle; + private final TupleDomain constraintSummary; + + @JsonCreator + public PaimonTableLayoutHandle( + @JsonProperty("tableHandle") PaimonTableHandle tableHandle, + @JsonProperty("constraintSummary") TupleDomain constraintSummary) { + this.tableHandle = Objects.requireNonNull(tableHandle, "table is null"); + this.constraintSummary = constraintSummary; + } + + @JsonProperty + public PaimonTableHandle getTableHandle() { + return tableHandle; + } + + @JsonProperty + public TupleDomain getConstraintSummary() { + return constraintSummary; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + PaimonTableLayoutHandle other = (PaimonTableLayoutHandle) obj; + return Objects.equals(tableHandle, other.tableHandle) + && Objects.equals(constraintSummary, other.constraintSummary); + } + + @Override + public int hashCode() { + return Objects.hash(tableHandle, constraintSummary); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("tableHandle", tableHandle) + .add("constraintSummary", constraintSummary) + .toString(); + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTransactionHandle.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTransactionHandle.java new file mode 100644 index 0000000000000..77b438433816d --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTransactionHandle.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import static java.util.Objects.requireNonNull; + +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; +import java.util.UUID; + +/** Presto {@link ConnectorTransactionHandle}. */ +public class PaimonTransactionHandle implements ConnectorTransactionHandle { + + private final UUID uuid; + + public PaimonTransactionHandle() { + this(UUID.randomUUID()); + } + + public PaimonTransactionHandle(@JsonProperty("uuid") UUID uuid) { + this.uuid = requireNonNull(uuid, "uuid is null"); + } + + @JsonProperty + public UUID getUuid() { + return uuid; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + PaimonTransactionHandle other = (PaimonTransactionHandle) obj; + return Objects.equals(uuid, other.uuid); + } + + @Override + public int hashCode() { + return Objects.hash(uuid); + } + + @Override + public String toString() { + return uuid.toString(); + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTransactionManager.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTransactionManager.java new file mode 100644 index 0000000000000..2760b9d5d5d18 --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTransactionManager.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import org.apache.paimon.shade.guava30.com.google.common.base.Preconditions; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** Presto TransactionManager. */ +public class PaimonTransactionManager { + + private final Map transactions = + new ConcurrentHashMap<>(); + + public ConnectorMetadata get(ConnectorTransactionHandle transaction) { + ConnectorMetadata metadata = transactions.get(transaction); + Preconditions.checkArgument(metadata != null, "no such transaction: %s", transaction); + return metadata; + } + + public ConnectorMetadata remove(ConnectorTransactionHandle transaction) { + ConnectorMetadata metadata = transactions.remove(transaction); + Preconditions.checkArgument(metadata != null, "no such transaction: %s", transaction); + return metadata; + } + + public void put(ConnectorTransactionHandle transaction, ConnectorMetadata metadata) { + ConnectorMetadata existing = transactions.putIfAbsent(transaction, metadata); + Preconditions.checkState(existing == null, "transaction already exists: %s", existing); + } +} diff --git a/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTypeUtils.java b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTypeUtils.java new file mode 100644 index 0000000000000..b317e57b32b85 --- /dev/null +++ b/presto-paimon/src/main/java/com/facebook/presto/paimon/PaimonTypeUtils.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import static java.lang.String.format; + +import com.facebook.presto.common.type.BigintType; +import com.facebook.presto.common.type.IntegerType; +import com.facebook.presto.common.type.RealType; +import com.facebook.presto.common.type.SmallintType; +import com.facebook.presto.common.type.StandardTypes; +import com.facebook.presto.common.type.TimestampWithTimeZoneType; +import com.facebook.presto.common.type.TinyintType; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.common.type.TypeSignature; +import com.facebook.presto.common.type.TypeSignatureParameter; +import com.facebook.presto.common.type.VarbinaryType; +import com.facebook.presto.common.type.VarcharType; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimeType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +/** Presto type from Paimon Type. */ +public class PaimonTypeUtils { + + private PaimonTypeUtils() {} + + public static Type toPrestoType(DataType paimonType, TypeManager typeManager) { + if (paimonType instanceof CharType) { + return com.facebook.presto.common.type.CharType.createCharType( + Math.min( + com.facebook.presto.common.type.CharType.MAX_LENGTH, + ((CharType) paimonType).getLength())); + } else if (paimonType instanceof VarCharType) { + return VarcharType.createUnboundedVarcharType(); + } else if (paimonType instanceof BooleanType) { + return com.facebook.presto.common.type.BooleanType.BOOLEAN; + } else if (paimonType instanceof BinaryType) { + return VarbinaryType.VARBINARY; + } else if (paimonType instanceof VarBinaryType) { + return VarbinaryType.VARBINARY; + } else if (paimonType instanceof DecimalType) { + return com.facebook.presto.common.type.DecimalType.createDecimalType( + ((DecimalType) paimonType).getPrecision(), + ((DecimalType) paimonType).getScale()); + } else if (paimonType instanceof TinyIntType) { + return TinyintType.TINYINT; + } else if (paimonType instanceof SmallIntType) { + return SmallintType.SMALLINT; + } else if (paimonType instanceof IntType) { + return IntegerType.INTEGER; + } else if (paimonType instanceof BigIntType) { + return BigintType.BIGINT; + } else if (paimonType instanceof FloatType) { + return RealType.REAL; + } else if (paimonType instanceof DoubleType) { + return com.facebook.presto.common.type.DoubleType.DOUBLE; + } else if (paimonType instanceof DateType) { + return com.facebook.presto.common.type.DateType.DATE; + } else if (paimonType instanceof TimeType) { + return com.facebook.presto.common.type.TimeType.TIME; + } else if (paimonType instanceof TimestampType) { + return com.facebook.presto.common.type.TimestampType.TIMESTAMP; + } else if (paimonType instanceof LocalZonedTimestampType) { + return TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE; + } else if (paimonType instanceof ArrayType) { + DataType elementType = ((ArrayType) paimonType).getElementType(); + return new com.facebook.presto.common.type.ArrayType( + Objects.requireNonNull(toPrestoType(elementType, typeManager))); + } else if (paimonType instanceof MapType) { + MapType paimonMapType = (MapType) paimonType; + TypeSignature keyType = + Objects.requireNonNull(toPrestoType(paimonMapType.getKeyType(), typeManager)) + .getTypeSignature(); + TypeSignature valueType = + Objects.requireNonNull(toPrestoType(paimonMapType.getValueType(), typeManager)) + .getTypeSignature(); + return typeManager.getParameterizedType( + StandardTypes.MAP, + ImmutableList.of( + TypeSignatureParameter.of(keyType), + TypeSignatureParameter.of(valueType))); + } else if (paimonType instanceof RowType) { + RowType rowType = (RowType) paimonType; + List fields = + rowType.getFields().stream() + .map( + field -> + com.facebook.presto.common.type.RowType.field( + field.name(), + toPrestoType(field.type(), typeManager))) + .collect(Collectors.toList()); + return com.facebook.presto.common.type.RowType.from(fields); + } else { + throw new UnsupportedOperationException( + format("Cannot convert from Paimon type '%s' to Presto type", paimonType)); + } + } + + public static DataType toPaimonType(Type prestoType) { + if (prestoType instanceof com.facebook.presto.common.type.CharType) { + return DataTypes.CHAR( + ((com.facebook.presto.common.type.CharType) prestoType).getLength()); + } else if (prestoType instanceof VarcharType) { + return DataTypes.VARCHAR( + Math.min(Integer.MAX_VALUE, ((VarcharType) prestoType).getLength())); + } else if (prestoType instanceof com.facebook.presto.common.type.BooleanType) { + return DataTypes.BOOLEAN(); + } else if (prestoType instanceof VarbinaryType) { + // The varbinary in Presto currently does not accept the maximum length parameter, it is + // unbounded + return DataTypes.VARBINARY(Integer.MAX_VALUE); + } else if (prestoType instanceof com.facebook.presto.common.type.DecimalType) { + return DataTypes.DECIMAL( + ((com.facebook.presto.common.type.DecimalType) prestoType).getPrecision(), + ((com.facebook.presto.common.type.DecimalType) prestoType).getScale()); + } else if (prestoType instanceof TinyintType) { + return DataTypes.TINYINT(); + } else if (prestoType instanceof SmallintType) { + return DataTypes.SMALLINT(); + } else if (prestoType instanceof IntegerType) { + return DataTypes.INT(); + } else if (prestoType instanceof BigintType) { + return DataTypes.BIGINT(); + } else if (prestoType instanceof RealType) { + return DataTypes.FLOAT(); + } else if (prestoType instanceof com.facebook.presto.common.type.DoubleType) { + return DataTypes.DOUBLE(); + } else if (prestoType instanceof com.facebook.presto.common.type.DateType) { + return DataTypes.DATE(); + } else if (prestoType instanceof com.facebook.presto.common.type.TimeType) { + return new TimeType(); + } else if (prestoType instanceof com.facebook.presto.common.type.TimestampType) { + return DataTypes.TIMESTAMP(); + } else if (prestoType instanceof TimestampWithTimeZoneType) { + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(); + } else if (prestoType instanceof com.facebook.presto.common.type.ArrayType) { + return DataTypes.ARRAY( + toPaimonType( + ((com.facebook.presto.common.type.ArrayType) prestoType) + .getElementType())); + } else if (prestoType instanceof com.facebook.presto.common.type.MapType) { + return DataTypes.MAP( + toPaimonType( + ((com.facebook.presto.common.type.MapType) prestoType).getKeyType()), + toPaimonType( + ((com.facebook.presto.common.type.MapType) prestoType).getValueType())); + } else if (prestoType instanceof com.facebook.presto.common.type.RowType) { + com.facebook.presto.common.type.RowType rowType = + (com.facebook.presto.common.type.RowType) prestoType; + AtomicInteger id = new AtomicInteger(0); + List dataFields = + rowType.getFields().stream() + .map( + field -> + new DataField( + id.getAndIncrement(), + field.getName().get(), + toPaimonType(field.getType()))) + .collect(Collectors.toList()); + return new RowType(true, dataFields); + } else { + throw new UnsupportedOperationException( + format("Cannot convert from Presto type '%s' to Paimon type", prestoType)); + } + } +} diff --git a/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonColumnHandleTest.java b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonColumnHandleTest.java new file mode 100644 index 0000000000000..40c41d5e8fe0d --- /dev/null +++ b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonColumnHandleTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.JsonSerdeUtil; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.json.JsonCodecFactory; +import com.facebook.airlift.json.JsonObjectMapperProvider; +import com.facebook.airlift.json.ObjectMapperProvider; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.type.TypeDeserializer; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import static com.facebook.presto.common.type.StandardTypes.VARCHAR; +import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature; +import static com.facebook.presto.metadata.FunctionAndTypeManager.createTestFunctionAndTypeManager; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link PaimonColumnHandle}. */ +public class PaimonColumnHandleTest { + + @Test + public void testPrestoColumnHandle() { + VarCharType varCharType = VarCharType.stringType(true); + PaimonColumnHandle expected = + new PaimonColumnHandle( + "name", + JsonSerdeUtil.toJson(varCharType), + createTestFunctionAndTypeManager().getType(parseTypeSignature(VARCHAR))); + testRoundTrip(expected); + } + + private void testRoundTrip(PaimonColumnHandle expected) { + ObjectMapperProvider objectMapperProvider = new JsonObjectMapperProvider(); + objectMapperProvider.setJsonDeserializers( + ImmutableMap.of( + Type.class, new TypeDeserializer(createTestFunctionAndTypeManager()))); + JsonCodec codec = + new JsonCodecFactory(objectMapperProvider).jsonCodec(PaimonColumnHandle.class); + String json = codec.toJson(expected); + PaimonColumnHandle actual = codec.fromJson(json); + assertThat(actual).isEqualTo(expected); + assertThat(actual.getColumnName()).isEqualTo(expected.getColumnName()); + assertThat(actual.paimonType()).isEqualTo(expected.paimonType()); + assertThat(actual.getPrestoType()).isEqualTo(expected.getPrestoType()); + assertThat(actual.getTypeString()).isEqualTo(expected.getTypeString()); + } +} diff --git a/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonPluginTest.java b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonPluginTest.java new file mode 100644 index 0000000000000..b91ce8526176f --- /dev/null +++ b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonPluginTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import com.facebook.presto.spi.Plugin; +import com.facebook.presto.spi.connector.Connector; +import com.facebook.presto.spi.connector.ConnectorFactory; +import com.facebook.presto.testing.TestingConnectorContext; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.UUID; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link PaimonPlugin}. */ +public class PaimonPluginTest { + + @Test + public void testCreatePaimonConnector() throws IOException { + String warehouse = + Files.createTempDirectory(UUID.randomUUID().toString()).toUri().toString(); + Plugin plugin = new PaimonPlugin(); + ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories()); + Connector connector = + factory.create( + "paimon", + ImmutableMap.of("warehouse", warehouse), + new TestingConnectorContext()); + assertThat(connector).isNotNull(); + } +} diff --git a/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonQueryRunner.java b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonQueryRunner.java new file mode 100644 index 0000000000000..0e1d5dea5e83a --- /dev/null +++ b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonQueryRunner.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import com.facebook.airlift.log.Logger; +import com.facebook.airlift.log.Logging; +import com.facebook.presto.Session; +import com.facebook.presto.tests.DistributedQueryRunner; +import com.facebook.presto.tpch.TpchPlugin; +import com.google.common.collect.ImmutableMap; + +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static com.facebook.presto.testing.TestingSession.testSessionBuilder; + +/** The query runner of paimon. */ +public class PaimonQueryRunner { + + private static final Logger LOG = Logger.get(PaimonQueryRunner.class); + + private static final String PAIMON_CATALOG = "paimon"; + + private PaimonQueryRunner() {} + + public static DistributedQueryRunner createPrestoQueryRunner( + Map extraProperties) throws Exception { + return createPrestoQueryRunner(extraProperties, ImmutableMap.of(), false); + } + + public static DistributedQueryRunner createPrestoQueryRunner( + Map extraProperties, + Map extraConnectorProperties, + boolean createTpchTables) + throws Exception { + + Session session = testSessionBuilder().setCatalog(PAIMON_CATALOG).setSchema("tpch").build(); + + DistributedQueryRunner queryRunner = + DistributedQueryRunner.builder(session).setExtraProperties(extraProperties).build(); + + queryRunner.installPlugin(new TpchPlugin()); + queryRunner.createCatalog("tpch", "tpch"); + + Path dataDir = queryRunner.getCoordinator().getDataDirectory().resolve("paimon_data"); + Path catalogDir = dataDir.getParent().resolve("catalog"); + + queryRunner.installPlugin(new PaimonPlugin()); + + Map options = + ImmutableMap.builder() + .put("warehouse", catalogDir.toFile().toURI().toString()) + .putAll(extraConnectorProperties) + .build(); + + queryRunner.createCatalog(PAIMON_CATALOG, PAIMON_CATALOG, options); + + queryRunner.execute("CREATE SCHEMA tpch"); + + // TODO + /*if (createTpchTables) { + copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, session, TpchTable.getTables()); + }*/ + + return queryRunner; + } + + public static void main(String[] args) throws InterruptedException { + Logging.initialize(); + Map properties = + com.google.common.collect.ImmutableMap.of("http-server.http.port", "8080"); + DistributedQueryRunner queryRunner = null; + try { + queryRunner = createPrestoQueryRunner(properties); + } catch (Throwable t) { + LOG.error(t); + System.exit(1); + } + TimeUnit.MILLISECONDS.sleep(10); + Logger log = Logger.get(PaimonQueryRunner.class); + log.info("======== SERVER STARTED ========"); + log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl()); + } +} diff --git a/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonSplitTest.java b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonSplitTest.java new file mode 100644 index 0000000000000..dc1753fb061a6 --- /dev/null +++ b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonSplitTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import com.facebook.airlift.json.JsonCodec; +import org.testng.annotations.Test; + +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link PaimonSplit}. */ +public class PaimonSplitTest { + + private final JsonCodec codec = JsonCodec.jsonCodec(PaimonSplit.class); + + @Test + public void testJsonRoundTrip() throws Exception { + byte[] serializedTable = TestPrestoUtils.getSerializedTable(); + PaimonSplit expected = new PaimonSplit(Arrays.toString(serializedTable)); + String json = codec.toJson(expected); + PaimonSplit actual = codec.fromJson(json); + assertThat(actual.getSplitSerialized()).isEqualTo(expected.getSplitSerialized()); + } +} diff --git a/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonTableHandleTest.java b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonTableHandleTest.java new file mode 100644 index 0000000000000..5a233605a9537 --- /dev/null +++ b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonTableHandleTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import com.facebook.airlift.json.JsonCodec; +import com.facebook.presto.common.predicate.TupleDomain; +import org.testng.annotations.Test; + +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link PaimonTableHandle}. */ +public class PaimonTableHandleTest { + + private final JsonCodec codec = JsonCodec.jsonCodec(PaimonTableHandle.class); + + @Test + public void testPrestoTableHandle() throws Exception { + byte[] serializedTable = TestPrestoUtils.getSerializedTable(); + PaimonTableHandle expected = + new PaimonTableHandle( + "test", "user", serializedTable, TupleDomain.all(), Optional.empty()); + testRoundTrip(expected); + } + + private void testRoundTrip(PaimonTableHandle expected) { + String json = codec.toJson(expected); + PaimonTableHandle actual = codec.fromJson(json); + assertThat(actual).isEqualTo(expected); + assertThat(actual.getSchemaName()).isEqualTo(expected.getSchemaName()); + assertThat(actual.getTableName()).isEqualTo(expected.getTableName()); + assertThat(actual.getSerializedTable()).isEqualTo(expected.getSerializedTable()); + assertThat(actual.getFilter()).isEqualTo(expected.getFilter()); + assertThat(actual.getProjectedColumns()).isEqualTo(expected.getProjectedColumns()); + } +} diff --git a/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonTypeTest.java b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonTypeTest.java new file mode 100644 index 0000000000000..ae5bc418479f3 --- /dev/null +++ b/presto-paimon/src/test/java/com/facebook/presto/paimon/PaimonTypeTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.TimeType; +import org.apache.paimon.types.VarCharType; + +import com.facebook.presto.common.type.ArrayType; +import com.facebook.presto.common.type.BigintType; +import com.facebook.presto.common.type.BooleanType; +import com.facebook.presto.common.type.CharType; +import com.facebook.presto.common.type.DateType; +import com.facebook.presto.common.type.DecimalType; +import com.facebook.presto.common.type.DoubleType; +import com.facebook.presto.common.type.IntegerType; +import com.facebook.presto.common.type.RealType; +import com.facebook.presto.common.type.RowType; +import com.facebook.presto.common.type.SmallintType; +import com.facebook.presto.common.type.StandardTypes; +import com.facebook.presto.common.type.TimestampType; +import com.facebook.presto.common.type.TimestampWithTimeZoneType; +import com.facebook.presto.common.type.TinyintType; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.TypeSignatureParameter; +import com.facebook.presto.common.type.VarbinaryType; +import com.facebook.presto.common.type.VarcharType; +import com.facebook.presto.metadata.FunctionAndTypeManager; +import com.google.common.collect.ImmutableList; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static com.facebook.presto.metadata.FunctionAndTypeManager.createTestFunctionAndTypeManager; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link PaimonTypeUtils}. */ +public class PaimonTypeTest { + + @Test + public void testToPrestoType() { + Type charType = + PaimonTypeUtils.toPrestoType(DataTypes.CHAR(1), createTestFunctionAndTypeManager()); + assertThat(Objects.requireNonNull(charType).getDisplayName()).isEqualTo("char(1)"); + + Type varCharType = + PaimonTypeUtils.toPrestoType( + DataTypes.VARCHAR(10), createTestFunctionAndTypeManager()); + assertThat(Objects.requireNonNull(varCharType).getDisplayName()).isEqualTo("varchar"); + + Type booleanType = + PaimonTypeUtils.toPrestoType( + DataTypes.BOOLEAN(), createTestFunctionAndTypeManager()); + assertThat(Objects.requireNonNull(booleanType).getDisplayName()).isEqualTo("boolean"); + + Type binaryType = + PaimonTypeUtils.toPrestoType( + DataTypes.BINARY(10), createTestFunctionAndTypeManager()); + assertThat(Objects.requireNonNull(binaryType).getDisplayName()).isEqualTo("varbinary"); + + Type varBinaryType = + PaimonTypeUtils.toPrestoType( + DataTypes.VARBINARY(10), createTestFunctionAndTypeManager()); + assertThat(Objects.requireNonNull(varBinaryType).getDisplayName()).isEqualTo("varbinary"); + + assertThat( + PaimonTypeUtils.toPrestoType( + DataTypes.DECIMAL(38, 0), + createTestFunctionAndTypeManager()) + .getDisplayName()) + .isEqualTo("decimal(38,0)"); + + org.apache.paimon.types.DecimalType decimal = DataTypes.DECIMAL(2, 2); + assertThat( + PaimonTypeUtils.toPrestoType(decimal, createTestFunctionAndTypeManager()) + .getDisplayName()) + .isEqualTo("decimal(2,2)"); + + Type tinyIntType = + PaimonTypeUtils.toPrestoType( + DataTypes.TINYINT(), createTestFunctionAndTypeManager()); + assertThat(Objects.requireNonNull(tinyIntType).getDisplayName()).isEqualTo("tinyint"); + + Type smallIntType = + PaimonTypeUtils.toPrestoType( + DataTypes.SMALLINT(), createTestFunctionAndTypeManager()); + assertThat(Objects.requireNonNull(smallIntType).getDisplayName()).isEqualTo("smallint"); + + Type intType = + PaimonTypeUtils.toPrestoType(DataTypes.INT(), createTestFunctionAndTypeManager()); + assertThat(Objects.requireNonNull(intType).getDisplayName()).isEqualTo("integer"); + + Type bigIntType = + PaimonTypeUtils.toPrestoType( + DataTypes.BIGINT(), createTestFunctionAndTypeManager()); + assertThat(Objects.requireNonNull(bigIntType).getDisplayName()).isEqualTo("bigint"); + + Type doubleType = + PaimonTypeUtils.toPrestoType( + DataTypes.DOUBLE(), createTestFunctionAndTypeManager()); + assertThat(Objects.requireNonNull(doubleType).getDisplayName()).isEqualTo("double"); + + Type dateType = + PaimonTypeUtils.toPrestoType(DataTypes.DATE(), createTestFunctionAndTypeManager()); + assertThat(Objects.requireNonNull(dateType).getDisplayName()).isEqualTo("date"); + + Type timeType = + PaimonTypeUtils.toPrestoType(new TimeType(), createTestFunctionAndTypeManager()); + assertThat(Objects.requireNonNull(timeType).getDisplayName()).isEqualTo("time"); + + Type timestampType = + PaimonTypeUtils.toPrestoType( + DataTypes.TIMESTAMP(), createTestFunctionAndTypeManager()); + assertThat(Objects.requireNonNull(timestampType).getDisplayName()).isEqualTo("timestamp"); + + Type localZonedTimestampType = + PaimonTypeUtils.toPrestoType( + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), + createTestFunctionAndTypeManager()); + assertThat(Objects.requireNonNull(localZonedTimestampType).getDisplayName()) + .isEqualTo("timestamp with time zone"); + + Type mapType = + PaimonTypeUtils.toPrestoType( + DataTypes.MAP(DataTypes.BIGINT(), DataTypes.STRING()), + createTestFunctionAndTypeManager()); + assertThat(Objects.requireNonNull(mapType).getDisplayName()) + .isEqualTo("map(bigint, varchar)"); + + Type row = + PaimonTypeUtils.toPrestoType( + DataTypes.ROW( + new DataField(0, "id", new IntType()), + new DataField(1, "name", new VarCharType(Integer.MAX_VALUE))), + createTestFunctionAndTypeManager()); + assertThat(Objects.requireNonNull(row).getDisplayName()) + .isEqualTo("row(\"id\" integer, \"name\" varchar)"); + } + + @Test + public void testToPaimonType() { + DataType charType = PaimonTypeUtils.toPaimonType(CharType.createCharType(1)); + assertThat(charType.asSQLString()).isEqualTo("CHAR(1)"); + + DataType varCharType = + PaimonTypeUtils.toPaimonType(VarcharType.createUnboundedVarcharType()); + assertThat(varCharType.asSQLString()).isEqualTo("STRING"); + + DataType booleanType = PaimonTypeUtils.toPaimonType(BooleanType.BOOLEAN); + assertThat(booleanType.asSQLString()).isEqualTo("BOOLEAN"); + + DataType varbinaryType = PaimonTypeUtils.toPaimonType(VarbinaryType.VARBINARY); + assertThat(varbinaryType.asSQLString()).isEqualTo("BYTES"); + + DataType decimalType = PaimonTypeUtils.toPaimonType(DecimalType.createDecimalType()); + assertThat(decimalType.asSQLString()).isEqualTo("DECIMAL(38, 0)"); + + DataType tinyintType = PaimonTypeUtils.toPaimonType(TinyintType.TINYINT); + assertThat(tinyintType.asSQLString()).isEqualTo("TINYINT"); + + DataType smallintType = PaimonTypeUtils.toPaimonType(SmallintType.SMALLINT); + assertThat(smallintType.asSQLString()).isEqualTo("SMALLINT"); + + DataType intType = PaimonTypeUtils.toPaimonType(IntegerType.INTEGER); + assertThat(intType.asSQLString()).isEqualTo("INT"); + + DataType bigintType = PaimonTypeUtils.toPaimonType(BigintType.BIGINT); + assertThat(bigintType.asSQLString()).isEqualTo("BIGINT"); + + DataType floatType = PaimonTypeUtils.toPaimonType(RealType.REAL); + assertThat(floatType.asSQLString()).isEqualTo("FLOAT"); + + DataType doubleType = PaimonTypeUtils.toPaimonType(DoubleType.DOUBLE); + assertThat(doubleType.asSQLString()).isEqualTo("DOUBLE"); + + DataType dateType = PaimonTypeUtils.toPaimonType(DateType.DATE); + assertThat(dateType.asSQLString()).isEqualTo("DATE"); + + DataType timeType = + PaimonTypeUtils.toPaimonType(com.facebook.presto.common.type.TimeType.TIME); + assertThat(timeType.asSQLString()).isEqualTo("TIME(0)"); + + DataType timestampType = PaimonTypeUtils.toPaimonType(TimestampType.TIMESTAMP); + assertThat(timestampType.asSQLString()).isEqualTo("TIMESTAMP(6)"); + + DataType timestampWithTimeZoneType = + PaimonTypeUtils.toPaimonType(TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE); + assertThat(timestampWithTimeZoneType.asSQLString()) + .isEqualTo("TIMESTAMP(6) WITH LOCAL TIME ZONE"); + + DataType arrayType = PaimonTypeUtils.toPaimonType(new ArrayType(IntegerType.INTEGER)); + assertThat(arrayType.asSQLString()).isEqualTo("ARRAY"); + + FunctionAndTypeManager testFunctionAndTypeManager = createTestFunctionAndTypeManager(); + Type parameterizedType = + testFunctionAndTypeManager.getParameterizedType( + StandardTypes.MAP, + ImmutableList.of( + TypeSignatureParameter.of(BigintType.BIGINT.getTypeSignature()), + TypeSignatureParameter.of( + VarcharType.createUnboundedVarcharType() + .getTypeSignature()))); + DataType mapType = PaimonTypeUtils.toPaimonType(parameterizedType); + assertThat(mapType.asSQLString()).isEqualTo("MAP"); + + List fields = new ArrayList<>(); + fields.add(new RowType.Field(java.util.Optional.of("id"), IntegerType.INTEGER)); + fields.add( + new RowType.Field( + java.util.Optional.of("name"), VarcharType.createUnboundedVarcharType())); + Type type = RowType.from(fields); + DataType rowType = PaimonTypeUtils.toPaimonType(type); + assertThat(rowType.asSQLString()).isEqualTo("ROW<`id` INT, `name` STRING>"); + } +} diff --git a/presto-paimon/src/test/java/com/facebook/presto/paimon/SimpleTableTestHelper.java b/presto-paimon/src/test/java/com/facebook/presto/paimon/SimpleTableTestHelper.java new file mode 100644 index 0000000000000..982df248afa2a --- /dev/null +++ b/presto-paimon/src/test/java/com/facebook/presto/paimon/SimpleTableTestHelper.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.sink.InnerTableCommit; +import org.apache.paimon.table.sink.InnerTableWrite; +import org.apache.paimon.types.RowType; + +import java.util.Collections; +import java.util.HashMap; + +/** A simple table test helper to write and commit. */ +public class SimpleTableTestHelper { + + private final InnerTableWrite writer; + private final InnerTableCommit commit; + + public SimpleTableTestHelper(Path path, RowType rowType) throws Exception { + new SchemaManager(LocalFileIO.create(), path) + .createTable( + new Schema( + rowType.getFields(), + Collections.emptyList(), + Collections.emptyList(), + new HashMap() { + { + put("write-mode", "change-log"); + } + }, + "")); + FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), path); + String user = "user"; + this.writer = table.newWrite(user); + this.commit = table.newCommit(user); + } + + public void write(InternalRow row) throws Exception { + writer.write(row); + } + + public void commit() throws Exception { + commit.commit(0, writer.prepareCommit(true, 0)); + } +} diff --git a/presto-paimon/src/test/java/com/facebook/presto/paimon/TestPrestoUtils.java b/presto-paimon/src/test/java/com/facebook/presto/paimon/TestPrestoUtils.java new file mode 100644 index 0000000000000..50572c9b8b718 --- /dev/null +++ b/presto-paimon/src/test/java/com/facebook/presto/paimon/TestPrestoUtils.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.paimon; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.InstantiationUtil; + +import java.nio.file.Files; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.apache.paimon.data.BinaryString.fromString; + +/** paimon test util. */ +public class TestPrestoUtils { + + public static byte[] getSerializedTable() throws Exception { + String warehouse = + Files.createTempDirectory(UUID.randomUUID().toString()).toUri().toString(); + Path tablePath = new Path(warehouse, "test.db/user"); + SimpleTableTestHelper testHelper = createTestHelper(tablePath); + testHelper.write(GenericRow.of(1, 2L, fromString("1"), fromString("1"))); + testHelper.write(GenericRow.of(3, 4L, fromString("2"), fromString("2"))); + testHelper.write(GenericRow.of(5, 6L, fromString("3"), fromString("3"))); + testHelper.write( + GenericRow.ofKind(RowKind.DELETE, 3, 4L, fromString("2"), fromString("2"))); + testHelper.commit(); + Map config = new HashMap<>(); + config.put("warehouse", warehouse); + Catalog catalog = + CatalogFactory.createCatalog(CatalogContext.create(Options.fromMap(config))); + Identifier tablePath2 = new Identifier("test", "user"); + return InstantiationUtil.serializeObject(catalog.getTable(tablePath2)); + } + + private static SimpleTableTestHelper createTestHelper(Path tablePath) throws Exception { + RowType rowType = + new RowType( + Arrays.asList( + new DataField(0, "a", new IntType()), + new DataField(1, "b", new BigIntType()), + // test field name has upper case + new DataField(2, "aCa", new VarCharType()), + new DataField(3, "d", new CharType(1)))); + return new SimpleTableTestHelper(tablePath, rowType); + } +}