diff --git a/docs/src/main/sphinx/connector/iceberg.rst b/docs/src/main/sphinx/connector/iceberg.rst index befe97b6f49e..da0e683d11d3 100644 --- a/docs/src/main/sphinx/connector/iceberg.rst +++ b/docs/src/main/sphinx/connector/iceberg.rst @@ -45,10 +45,9 @@ To use Iceberg, you need: Configuration ------------- -The connector supports two Iceberg catalog types, you may use either a Hive -metastore service (HMS) or AWS Glue. The catalog type is determined by the -``iceberg.catalog.type`` property, it can be set to either ``HIVE_METASTORE`` -or ``GLUE``. +The connector supports multiple Iceberg catalog types, you may use either a Hive +metastore service (HMS), AWS Glue, or a REST catalog. The catalog type is determined by the +``iceberg.catalog.type`` property, it can be set to ``HIVE_METASTORE``, ``GLUE``, or ``REST``. .. _iceberg-hive-catalog: @@ -81,6 +80,43 @@ configuration properties as the Hive connector's Glue setup. See connector.name=iceberg iceberg.catalog.type=glue +REST catalog +^^^^^^^^^^^^^^ + +In order to use the Iceberg REST catalog, ensure to configure the catalog type with +``iceberg.catalog.type=rest`` and provide further details with the following +properties: + +============================================== ============================================================ +Property Name Description +============================================== ============================================================ +``iceberg.rest-catalog.uri`` REST server API endpoint URI (required). + Example: ``http://iceberg-with-rest:8181`` + +``iceberg.rest-catalog.security`` The type of security to use (default: ``NONE``). ``OAUTH2`` + requires either a ``token`` or ``credential``. + Example: ``OAUTH2`` + +``iceberg.rest-catalog.session`` Session information included when communicating with the REST Catalog. + Options are ``NONE`` or ``USER`` (default: ``NONE``). + +``iceberg.rest-catalog.oauth2.token`` The Bearer token which will be used for interactions + with the server. A ``token`` or ``credential`` is required for + ``OAUTH2`` security. + Example: ``AbCdEf123456`` + +``iceberg.rest-catalog.oauth2.credential`` The credential to exchange for a token in the OAuth2 client + credentials flow with the server. A ``token`` or ``credential`` + is required for ``OAUTH2`` security. + Example: ``AbCdEf123456`` +============================================== ============================================================ + +.. code-block:: text + + connector.name=iceberg + iceberg.catalog.type=rest + iceberg.rest-catalog.uri=http://iceberg-with-rest:8181 + General configuration ^^^^^^^^^^^^^^^^^^^^^ diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 48db13121363..51a5719b1a28 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -46,6 +46,10 @@ com.linkedin.calcite calcite-core + + io.airlift + http-client + @@ -145,6 +149,21 @@ guice + + io.jsonwebtoken + jjwt-api + + + + io.jsonwebtoken + jjwt-impl + + + + io.jsonwebtoken + jjwt-jackson + + javax.inject javax.inject @@ -238,6 +257,27 @@ runtime + + org.apache.httpcomponents.client5 + httpclient5 + 5.1 + runtime + + + + org.apache.httpcomponents.core5 + httpcore5 + 5.1.1 + runtime + + + + org.apache.iceberg + iceberg-bundled-guava + ${dep.iceberg.version} + runtime + + org.apache.iceberg iceberg-orc @@ -345,6 +385,18 @@ test + + io.airlift + http-server + test + + + + io.airlift + node + test + + io.airlift testing @@ -367,6 +419,19 @@ + + javax.servlet + javax.servlet-api + test + + + + org.apache.iceberg + iceberg-core + tests + test + + org.assertj assertj-core @@ -378,6 +443,13 @@ testng test + + + org.xerial + sqlite-jdbc + 3.36.0.3 + test + diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java index 35f46922a785..0e972cc41538 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java @@ -18,5 +18,6 @@ public enum CatalogType TESTING_FILE_METASTORE, HIVE_METASTORE, GLUE, + REST, /**/; } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java index 153294a4be23..ac7b8149d810 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java @@ -21,10 +21,12 @@ import io.trino.plugin.iceberg.catalog.file.IcebergFileMetastoreCatalogModule; import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule; import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule; +import io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogModule; import static io.airlift.configuration.ConditionalModule.conditionalModule; import static io.trino.plugin.iceberg.CatalogType.GLUE; import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE; +import static io.trino.plugin.iceberg.CatalogType.REST; import static io.trino.plugin.iceberg.CatalogType.TESTING_FILE_METASTORE; public class IcebergCatalogModule @@ -36,6 +38,7 @@ protected void setup(Binder binder) bindCatalogModule(HIVE_METASTORE, new IcebergHiveMetastoreCatalogModule()); bindCatalogModule(TESTING_FILE_METASTORE, new IcebergFileMetastoreCatalogModule()); bindCatalogModule(GLUE, new IcebergGlueCatalogModule()); + bindCatalogModule(REST, new IcebergRestCatalogModule()); } private void bindCatalogModule(CatalogType catalogType, Module module) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java new file mode 100644 index 000000000000..e4ad6590a8b2 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogConfig.java @@ -0,0 +1,84 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; + +import javax.validation.constraints.NotNull; + +import java.net.URI; + +public class IcebergRestCatalogConfig +{ + public enum Security + { + NONE, + OAUTH2, + } + + public enum SessionType + { + NONE, + USER + } + + private URI restUri; + private Security security = Security.NONE; + private SessionType sessionType = SessionType.NONE; + + @NotNull + public URI getBaseUri() + { + return this.restUri; + } + + @Config("iceberg.rest-catalog.uri") + @ConfigDescription("The URI to the REST server") + public IcebergRestCatalogConfig setBaseUri(String uri) + { + if (uri != null) { + this.restUri = URI.create(uri); + } + return this; + } + + @NotNull + public Security getSecurity() + { + return security; + } + + @Config("iceberg.rest-catalog.security") + @ConfigDescription("Authorization protocol to use when communicating with the REST catalog server") + public IcebergRestCatalogConfig setSecurity(Security security) + { + this.security = security; + return this; + } + + @NotNull + public IcebergRestCatalogConfig.SessionType getSessionType() + { + return sessionType; + } + + @Config("iceberg.rest-catalog.session") + @ConfigDescription("Type of REST catalog sessionType to use when communicating with REST catalog Server") + public IcebergRestCatalogConfig setSessionType(SessionType sessionType) + { + this.sessionType = sessionType; + return this; + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogModule.java new file mode 100644 index 000000000000..8950212bbbaf --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/IcebergRestCatalogModule.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.inject.Binder; +import com.google.inject.Scopes; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogConfig.Security; + +import static io.airlift.configuration.ConditionalModule.conditionalModule; +import static io.airlift.configuration.ConfigBinder.configBinder; + +public class IcebergRestCatalogModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + configBinder(binder).bindConfig(IcebergRestCatalogConfig.class); + install(conditionalModule( + IcebergRestCatalogConfig.class, + config -> config.getSecurity() == Security.OAUTH2, + new OAuth2SecurityModule(), + new NoneSecurityModule())); + + binder.bind(TrinoCatalogFactory.class).to(TrinoIcebergRestCatalogFactory.class).in(Scopes.SINGLETON); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/NoneSecurityModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/NoneSecurityModule.java new file mode 100644 index 000000000000..5aef37346e2e --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/NoneSecurityModule.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.inject.Binder; +import io.airlift.configuration.AbstractConfigurationAwareModule; + +public class NoneSecurityModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + binder.bind(SecurityProperties.class).to(NoneSecurityProperties.class); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/NoneSecurityProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/NoneSecurityProperties.java new file mode 100644 index 000000000000..4e377c37bc8c --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/NoneSecurityProperties.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; + +import java.util.Map; + +public class NoneSecurityProperties + implements SecurityProperties +{ + @Override + public Map get() + { + return ImmutableMap.of(); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SecurityConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SecurityConfig.java new file mode 100644 index 000000000000..e585fdeefd67 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SecurityConfig.java @@ -0,0 +1,62 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.configuration.ConfigSecuritySensitive; + +import javax.validation.constraints.AssertTrue; + +import java.util.Optional; + +public class OAuth2SecurityConfig +{ + private String credential; + private String token; + + public Optional getCredential() + { + return Optional.ofNullable(credential); + } + + @Config("iceberg.rest-catalog.oauth2.credential") + @ConfigDescription("The credential to exchange for a token in the OAuth2 client credentials flow with the server") + @ConfigSecuritySensitive + public OAuth2SecurityConfig setCredential(String credential) + { + this.credential = credential; + return this; + } + + public Optional getToken() + { + return Optional.ofNullable(token); + } + + @Config("iceberg.rest-catalog.oauth2.token") + @ConfigDescription("The Bearer token which will be used for interactions with the server") + @ConfigSecuritySensitive + public OAuth2SecurityConfig setToken(String token) + { + this.token = token; + return this; + } + + @AssertTrue(message = "OAuth2 requires a credential or token") + public boolean credentialOrTokenPresent() + { + return credential != null || token != null; + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SecurityModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SecurityModule.java new file mode 100644 index 000000000000..1a762894a0d5 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SecurityModule.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.inject.Binder; +import io.airlift.configuration.AbstractConfigurationAwareModule; + +import static io.airlift.configuration.ConfigBinder.configBinder; + +public class OAuth2SecurityModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + configBinder(binder).bindConfig(OAuth2SecurityConfig.class); + binder.bind(SecurityProperties.class).to(OAuth2SecurityProperties.class); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SecurityProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SecurityProperties.java new file mode 100644 index 000000000000..531b6df99fe5 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/OAuth2SecurityProperties.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.auth.OAuth2Properties; + +import javax.inject.Inject; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class OAuth2SecurityProperties + implements SecurityProperties +{ + private final Map securityProperties; + + @Inject + public OAuth2SecurityProperties(OAuth2SecurityConfig securityConfig) + { + requireNonNull(securityConfig, "securityConfig is null"); + + ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder(); + securityConfig.getCredential().ifPresent( + value -> propertiesBuilder.put(OAuth2Properties.CREDENTIAL, value)); + securityConfig.getToken().ifPresent( + value -> propertiesBuilder.put(OAuth2Properties.TOKEN, value)); + + this.securityProperties = propertiesBuilder.buildOrThrow(); + } + + @Override + public Map get() + { + return securityProperties; + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/SecurityProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/SecurityProperties.java new file mode 100644 index 000000000000..16395d049f75 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/SecurityProperties.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import java.util.Map; + +public interface SecurityProperties +{ + Map get(); +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java new file mode 100644 index 000000000000..33528b69f640 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java @@ -0,0 +1,85 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; +import io.trino.hdfs.ConfigurationUtils; +import io.trino.plugin.base.CatalogName; +import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogConfig.SessionType; +import io.trino.spi.security.ConnectorIdentity; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.rest.RESTSessionCatalog; + +import javax.annotation.concurrent.GuardedBy; +import javax.inject.Inject; + +import java.net.URI; + +import static java.util.Objects.requireNonNull; + +public class TrinoIcebergRestCatalogFactory + implements TrinoCatalogFactory +{ + private final CatalogName catalogName; + private final String trinoVersion; + private final URI serverUri; + private final SessionType sessionType; + private final SecurityProperties securityProperties; + private final boolean uniqueTableLocation; + + @GuardedBy("this") + private RESTSessionCatalog icebergCatalog; + + @Inject + public TrinoIcebergRestCatalogFactory( + CatalogName catalogName, + IcebergRestCatalogConfig restConfig, + SecurityProperties securityProperties, + IcebergConfig icebergConfig, + NodeVersion nodeVersion) + { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); + this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString(); + requireNonNull(restConfig, "restConfig is null"); + this.serverUri = restConfig.getBaseUri(); + this.sessionType = restConfig.getSessionType(); + this.securityProperties = requireNonNull(securityProperties, "securityProperties is null"); + requireNonNull(icebergConfig, "icebergConfig is null"); + this.uniqueTableLocation = icebergConfig.isUniqueTableLocation(); + } + + @Override + public synchronized TrinoCatalog create(ConnectorIdentity identity) + { + // Creation of the RESTSessionCatalog is lazy due to required network calls + // for authorization and config route + if (icebergCatalog == null) { + ImmutableMap.Builder properties = ImmutableMap.builder(); + properties.put(CatalogProperties.URI, serverUri.toString()); + properties.put("trino-version", trinoVersion); + properties.putAll(securityProperties.get()); + RESTSessionCatalog icebergCatalogInstance = new RESTSessionCatalog(); + icebergCatalogInstance.setConf(ConfigurationUtils.getInitialConfiguration()); + icebergCatalogInstance.initialize(catalogName.toString(), properties.buildOrThrow()); + + icebergCatalog = icebergCatalogInstance; + } + + return new TrinoRestCatalog(icebergCatalog, catalogName, sessionType, trinoVersion, uniqueTableLocation); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java new file mode 100644 index 000000000000..e1f1a1f8e0ce --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -0,0 +1,432 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import io.jsonwebtoken.impl.DefaultJwtBuilder; +import io.jsonwebtoken.jackson.io.JacksonSerializer; +import io.trino.plugin.base.CatalogName; +import io.trino.plugin.iceberg.ColumnIdentity; +import io.trino.plugin.iceberg.IcebergSchemaProperties; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogConfig.SessionType; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.CatalogSchemaTableName; +import io.trino.spi.connector.ConnectorMaterializedViewDefinition; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.SchemaNotFoundException; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.security.TrinoPrincipal; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.SessionCatalog.SessionContext; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.RESTException; +import org.apache.iceberg.rest.RESTSessionCatalog; +import org.apache.iceberg.rest.auth.OAuth2Properties; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; +import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.lang.String.format; +import static java.lang.String.join; +import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; + +public class TrinoRestCatalog + implements TrinoCatalog +{ + private final RESTSessionCatalog restSessionCatalog; + private final CatalogName catalogName; + private final SessionType sessionType; + private final String trinoVersion; + private final boolean useUniqueTableLocation; + + private final Map tableCache = new ConcurrentHashMap<>(); + + public TrinoRestCatalog( + RESTSessionCatalog restSessionCatalog, + CatalogName catalogName, + SessionType sessionType, + String trinoVersion, + boolean useUniqueTableLocation) + { + this.restSessionCatalog = requireNonNull(restSessionCatalog, "restSessionCatalog is null"); + this.catalogName = requireNonNull(catalogName, "catalogName is null"); + this.sessionType = requireNonNull(sessionType, "sessionType is null"); + this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null"); + this.useUniqueTableLocation = useUniqueTableLocation; + } + + @Override + public boolean namespaceExists(ConnectorSession session, String namespace) + { + return restSessionCatalog.namespaceExists(convert(session), Namespace.of(namespace)); + } + + @Override + public List listNamespaces(ConnectorSession session) + { + return restSessionCatalog.listNamespaces(convert(session)).stream() + .map(Namespace::toString) + .collect(toImmutableList()); + } + + @Override + public void dropNamespace(ConnectorSession session, String namespace) + { + try { + restSessionCatalog.dropNamespace(convert(session), Namespace.of(namespace)); + } + catch (NoSuchNamespaceException e) { + throw new SchemaNotFoundException(namespace); + } + } + + @Override + public Map loadNamespaceMetadata(ConnectorSession session, String namespace) + { + try { + // Return immutable metadata as direct modifications will not be reflected on the namespace + return ImmutableMap.copyOf(restSessionCatalog.loadNamespaceMetadata(convert(session), Namespace.of(namespace))); + } + catch (NoSuchNamespaceException e) { + throw new SchemaNotFoundException(namespace); + } + } + + @Override + public Optional getNamespacePrincipal(ConnectorSession session, String namespace) + { + // the REST specification currently does not have a way of defining ownership + return Optional.empty(); + } + + @Override + public void createNamespace(ConnectorSession session, String namespace, Map properties, TrinoPrincipal owner) + { + restSessionCatalog.createNamespace( + convert(session), + Namespace.of(namespace), + Maps.transformValues(properties, property -> { + if (property instanceof String stringProperty) { + return stringProperty; + } + throw new TrinoException(NOT_SUPPORTED, "Non-string properties are not support for Iceberg REST catalog"); + })); + } + + @Override + public void setNamespacePrincipal(ConnectorSession session, String namespace, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setNamespacePrincipal is not supported for Iceberg REST catalog"); + } + + @Override + public void renameNamespace(ConnectorSession session, String source, String target) + { + throw new TrinoException(NOT_SUPPORTED, "renameNamespace is not supported for Iceberg REST catalog"); + } + + @Override + public List listTables(ConnectorSession session, Optional namespace) + { + SessionContext sessionContext = convert(session); + List namespaces; + + if (namespace.isPresent() && namespaceExists(session, namespace.get())) { + namespaces = ImmutableList.of(Namespace.of(namespace.get())); + } + else { + namespaces = listNamespaces(session).stream() + .map(Namespace::of) + .collect(toImmutableList()); + } + + ImmutableList.Builder tables = ImmutableList.builder(); + for (Namespace restNamespace : namespaces) { + try { + tables.addAll( + restSessionCatalog.listTables(sessionContext, restNamespace).stream() + .map(id -> SchemaTableName.schemaTableName(id.namespace().toString(), id.name())) + .collect(toImmutableList())); + } + catch (NoSuchNamespaceException e) { + // Namespace may have been deleted during listing + } + } + return tables.build(); + } + + @Override + public Transaction newCreateTableTransaction( + ConnectorSession session, + SchemaTableName schemaTableName, + Schema schema, + PartitionSpec partitionSpec, + String location, + Map properties) + { + return restSessionCatalog.buildTable(convert(session), toIdentifier(schemaTableName), schema) + .withPartitionSpec(partitionSpec) + .withLocation(location) + .withProperties(properties) + .createTransaction(); + } + + @Override + public void registerTable(ConnectorSession session, SchemaTableName tableName, String tableLocation, String metadataLocation) + { + throw new TrinoException(NOT_SUPPORTED, "registerTable is not supported for Iceberg REST catalog"); + } + + @Override + public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) + { + if (!restSessionCatalog.dropTable(convert(session), toIdentifier(schemaTableName))) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to drop table: %s", schemaTableName)); + } + } + + @Override + public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) + { + try { + restSessionCatalog.renameTable(convert(session), toIdentifier(from), toIdentifier(to)); + } + catch (RESTException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to rename table %s to %s", from, to), e); + } + } + + @Override + public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName) + { + try { + return tableCache.computeIfAbsent( + schemaTableName.toString(), + key -> { + BaseTable baseTable = (BaseTable) restSessionCatalog.loadTable(convert(session), toIdentifier(schemaTableName)); + // Creating a new base table is necessary to adhere to Trino's expectations for quoted table names + return new BaseTable(baseTable.operations(), quotedTableName(schemaTableName)); + }); + } + catch (NoSuchTableException e) { + throw new TableNotFoundException(schemaTableName, e); + } + catch (RuntimeException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to load table: %s", schemaTableName), e); + } + } + + @Override + public void updateTableComment(ConnectorSession session, SchemaTableName schemaTableName, Optional comment) + { + Table icebergTable = restSessionCatalog.loadTable(convert(session), toIdentifier(schemaTableName)); + if (comment.isEmpty()) { + icebergTable.updateProperties().remove(TABLE_COMMENT).commit(); + } + else { + icebergTable.updateProperties().set(TABLE_COMMENT, comment.get()).commit(); + } + } + + @Override + public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName) + { + String tableName = createLocationForTable(schemaTableName.getTableName()); + + Map properties = loadNamespaceMetadata(session, schemaTableName.getSchemaName()); + String databaseLocation = (String) properties.get(IcebergSchemaProperties.LOCATION_PROPERTY); + checkArgument(databaseLocation != null, "location must be set for %s", schemaTableName.getSchemaName()); + + if (databaseLocation.endsWith("/")) { + return databaseLocation + tableName; + } + return join("/", databaseLocation, tableName); + } + + private String createLocationForTable(String baseTableName) + { + String tableName = baseTableName; + if (useUniqueTableLocation) { + tableName += "-" + randomUUID().toString().replace("-", ""); + } + return tableName; + } + + @Override + public void setTablePrincipal(ConnectorSession session, SchemaTableName schemaTableName, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setTablePrincipal is not supported for Iceberg REST catalog"); + } + + @Override + public void createView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorViewDefinition definition, boolean replace) + { + throw new TrinoException(NOT_SUPPORTED, "createView is not supported for Iceberg REST catalog"); + } + + @Override + public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target) + { + throw new TrinoException(NOT_SUPPORTED, "renameView is not supported for Iceberg REST catalog"); + } + + @Override + public void setViewPrincipal(ConnectorSession session, SchemaTableName schemaViewName, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setViewPrincipal is not supported for Iceberg REST catalog"); + } + + @Override + public void dropView(ConnectorSession session, SchemaTableName schemaViewName) + { + throw new TrinoException(NOT_SUPPORTED, "dropView is not supported for Iceberg REST catalog"); + } + + @Override + public List listViews(ConnectorSession session, Optional namespace) + { + return ImmutableList.of(); + } + + @Override + public Map getViews(ConnectorSession session, Optional namespace) + { + return ImmutableMap.of(); + } + + @Override + public Optional getView(ConnectorSession session, SchemaTableName viewName) + { + return Optional.empty(); + } + + @Override + public List listMaterializedViews(ConnectorSession session, Optional namespace) + { + return ImmutableList.of(); + } + + @Override + public void createMaterializedView(ConnectorSession session, SchemaTableName viewName, ConnectorMaterializedViewDefinition definition, boolean replace, boolean ignoreExisting) + { + throw new TrinoException(NOT_SUPPORTED, "createMaterializedView is not supported for Iceberg REST catalog"); + } + + @Override + public void dropMaterializedView(ConnectorSession session, SchemaTableName viewName) + { + throw new TrinoException(NOT_SUPPORTED, "dropMaterializedView is not supported for Iceberg REST catalog"); + } + + @Override + public Optional getMaterializedView(ConnectorSession session, SchemaTableName viewName) + { + return Optional.empty(); + } + + @Override + public void renameMaterializedView(ConnectorSession session, SchemaTableName source, SchemaTableName target) + { + throw new TrinoException(NOT_SUPPORTED, "renameMaterializedView is not supported for Iceberg REST catalog"); + } + + @Override + public void updateColumnComment(ConnectorSession session, SchemaTableName schemaTableName, ColumnIdentity columnIdentity, Optional comment) + { + loadTable(session, schemaTableName).updateSchema() + .updateColumnDoc(columnIdentity.getName(), comment.orElse(null)) + .commit(); + } + + @Override + public Optional redirectTable(ConnectorSession session, SchemaTableName tableName) + { + return Optional.empty(); + } + + @Override + public void updateViewComment(ConnectorSession session, SchemaTableName schemaViewName, Optional comment) + { + throw new TrinoException(NOT_SUPPORTED, "updateViewComment is not supported for Iceberg REST catalog"); + } + + @Override + public void updateViewColumnComment(ConnectorSession session, SchemaTableName schemaViewName, String columnName, Optional comment) + { + throw new TrinoException(NOT_SUPPORTED, "updateViewColumnComment is not supported for Iceberg REST catalog"); + } + + private SessionCatalog.SessionContext convert(ConnectorSession session) + { + return switch (sessionType) { + case NONE -> SessionCatalog.SessionContext.createEmpty(); + case USER -> { + String sessionId = format("%s-%s", session.getUser(), session.getSource().orElse("default")); + + Map properties = ImmutableMap.of( + "user", session.getUser(), + "source", session.getSource().orElse(""), + "trinoCatalog", catalogName.toString(), + "trinoVersion", trinoVersion); + + Map claims = ImmutableMap.builder() + .putAll(properties) + .buildOrThrow(); + + String subjectJwt = new DefaultJwtBuilder() + .setSubject(session.getUser()) + .setIssuer(trinoVersion) + .setIssuedAt(new Date()) + .addClaims(claims) + .serializeToJsonWith(new JacksonSerializer<>()) + .compact(); + + Map credentials = ImmutableMap.builder() + .putAll(session.getIdentity().getExtraCredentials()) + .put(OAuth2Properties.JWT_TOKEN_TYPE, subjectJwt) + .buildOrThrow(); + + yield new SessionCatalog.SessionContext(sessionId, session.getUser(), credentials, properties); + } + }; + } + + private static TableIdentifier toIdentifier(SchemaTableName schemaTableName) + { + return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName()); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index a8a062048988..df40ac66ef59 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -141,7 +141,7 @@ public abstract class BaseIcebergConnectorTest { private static final Pattern WITH_CLAUSE_EXTRACTOR = Pattern.compile(".*(WITH\\s*\\([^)]*\\))\\s*$", Pattern.DOTALL); - private final IcebergFileFormat format; + protected final IcebergFileFormat format; protected BaseIcebergConnectorTest(IcebergFileFormat format) { @@ -4299,11 +4299,19 @@ public void testSplitPruningForFilterOnNonPartitionColumn(DataMappingTestSetup t public void testGetIcebergTableProperties() { assertUpdate("CREATE TABLE test_iceberg_get_table_props (x BIGINT)"); - assertThat(query("SELECT * FROM \"test_iceberg_get_table_props$properties\"")) - .matches(format("VALUES (VARCHAR 'write.format.default', VARCHAR '%s')", format.name())); + verifyIcebergTableProperties(computeActual("SELECT * FROM \"test_iceberg_get_table_props$properties\"")); dropTable("test_iceberg_get_table_props"); } + protected void verifyIcebergTableProperties(MaterializedResult actual) + { + assertThat(actual).isNotNull(); + MaterializedResult expected = resultBuilder(getSession()) + .row("write.format.default", format.name()) + .build(); + assertEqualsIgnoreOrder(actual.getMaterializedRows(), expected.getMaterializedRows()); + } + protected abstract boolean supportsIcebergFileStatistics(String typeName); @Test(dataProvider = "testDataMappingSmokeTestDataProvider") @@ -6143,9 +6151,7 @@ public void testRenameTableToLongTableName() String baseTableName = "test_rename_target_" + randomNameSuffix(); - int maxLength = 255; - - String validTargetTableName = baseTableName + "z".repeat(maxLength - baseTableName.length()); + String validTargetTableName = baseTableName + "z".repeat(maxTableRenameLength() - baseTableName.length()); assertUpdate("ALTER TABLE " + sourceTableName + " RENAME TO " + validTargetTableName); assertTrue(getQueryRunner().tableExists(getSession(), validTargetTableName)); assertQuery("SELECT x FROM " + validTargetTableName, "VALUES 123"); @@ -6158,6 +6164,11 @@ public void testRenameTableToLongTableName() assertFalse(getQueryRunner().tableExists(getSession(), invalidTargetTableName)); } + protected int maxTableRenameLength() + { + return 255; + } + @Override protected OptionalInt maxTableNameLength() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java index 88b5d944058c..839f2600a073 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java @@ -220,6 +220,20 @@ public void testIcebergPluginFailsWhenIncorrectPropertyProvided() .hasMessageContaining("Configuration property 'hive.hive-views.enabled' was not used"); } + @Test + public void testRestCatalog() + { + ConnectorFactory factory = getConnectorFactory(); + + factory.create( + "test", + Map.of( + "iceberg.catalog.type", "rest", + "iceberg.rest-catalog.uri", "https://foo:1234"), + new TestingConnectorContext()) + .shutdown(); + } + private static ConnectorFactory getConnectorFactory() { return getOnlyElement(new IcebergPlugin().getConnectorFactories()); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java index 8caad178604e..fe946aa07e8e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java @@ -132,6 +132,7 @@ public void testCreateTable() String namespace = "test_create_table_" + randomNameSuffix(); String table = "tableName"; SchemaTableName schemaTableName = new SchemaTableName(namespace, table); + Map tableProperties = Map.of("test_key", "test_value"); try { catalog.createNamespace(SESSION, namespace, defaultNamespaceProperties(namespace), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); String tableLocation = arbitraryTableLocation(catalog, SESSION, schemaTableName); @@ -141,7 +142,7 @@ public void testCreateTable() new Schema(Types.NestedField.of(1, true, "col1", Types.LongType.get())), PartitionSpec.unpartitioned(), tableLocation, - ImmutableMap.of()) + tableProperties) .commitTransaction(); assertThat(catalog.listTables(SESSION, Optional.of(namespace))).contains(schemaTableName); assertThat(catalog.listTables(SESSION, Optional.empty())).contains(schemaTableName); @@ -152,7 +153,7 @@ public void testCreateTable() assertEquals(icebergTable.schema().columns().get(0).name(), "col1"); assertEquals(icebergTable.schema().columns().get(0).type(), Types.LongType.get()); assertEquals(icebergTable.location(), tableLocation); - assertEquals(icebergTable.properties(), ImmutableMap.of()); + assertThat(icebergTable.properties()).containsAllEntriesOf(tableProperties); catalog.dropTable(SESSION, schemaTableName); assertThat(catalog.listTables(SESSION, Optional.of(namespace))).doesNotContain(schemaTableName); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/RestCatalogTestUtils.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/RestCatalogTestUtils.java new file mode 100644 index 000000000000..cc0961b86804 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/RestCatalogTestUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.trino.hdfs.DynamicHdfsConfiguration; +import io.trino.hdfs.HdfsConfig; +import io.trino.hdfs.HdfsConfiguration; +import io.trino.hdfs.HdfsConfigurationInitializer; +import io.trino.hdfs.HdfsContext; +import io.trino.hdfs.HdfsEnvironment; +import io.trino.hdfs.authentication.NoHdfsAuthentication; +import io.trino.spi.connector.ConnectorSession; +import io.trino.testing.TestingConnectorSession; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.jdbc.JdbcCatalog; +import org.assertj.core.util.Files; + +import java.io.File; + +public final class RestCatalogTestUtils +{ + private RestCatalogTestUtils() {} + + public static Catalog backendCatalog(File warehouseLocation) + { + ImmutableMap.Builder properties = ImmutableMap.builder(); + properties.put(CatalogProperties.URI, "jdbc:h2:file:" + Files.newTemporaryFile().getAbsolutePath()); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation.toPath().resolve("iceberg_data").toFile().getAbsolutePath()); + + ConnectorSession connectorSession = TestingConnectorSession.builder().build(); + HdfsConfig hdfsConfig = new HdfsConfig(); + HdfsConfiguration hdfsConfiguration = new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of()); + HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication()); + HdfsContext context = new HdfsContext(connectorSession); + + JdbcCatalog catalog = new JdbcCatalog(); + catalog.setConf(hdfsEnvironment.getConfiguration(context, new Path(warehouseLocation.getAbsolutePath()))); + catalog.initialize("backend_jdbc", properties.buildOrThrow()); + + return catalog; + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java new file mode 100644 index 000000000000..aebe933b8b37 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConfig.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; + +public class TestIcebergRestCatalogConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(IcebergRestCatalogConfig.class) + .setBaseUri(null) + .setSessionType(IcebergRestCatalogConfig.SessionType.NONE) + .setSecurity(IcebergRestCatalogConfig.Security.NONE)); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("iceberg.rest-catalog.uri", "http://localhost:1234") + .put("iceberg.rest-catalog.security", "OAUTH2") + .put("iceberg.rest-catalog.session", "USER") + .buildOrThrow(); + + IcebergRestCatalogConfig expected = new IcebergRestCatalogConfig() + .setBaseUri("http://localhost:1234") + .setSessionType(IcebergRestCatalogConfig.SessionType.USER) + .setSecurity(IcebergRestCatalogConfig.Security.OAUTH2); + + assertFullMapping(properties, expected); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConnectorSmokeTest.java new file mode 100644 index 000000000000..72ebe12515c0 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConnectorSmokeTest.java @@ -0,0 +1,166 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; +import io.airlift.http.server.testing.TestingHttpServer; +import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorBehavior; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.rest.DelegatingRestSessionCatalog; +import org.assertj.core.util.Files; + +import java.io.File; +import java.util.Optional; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.iceberg.catalog.rest.RestCatalogTestUtils.backendCatalog; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestIcebergRestCatalogConnectorSmokeTest + extends BaseIcebergConnectorSmokeTest +{ + public TestIcebergRestCatalogConnectorSmokeTest() + { + super(new IcebergConfig().getFileFormat().toIceberg()); + } + + @SuppressWarnings("DuplicateBranchesInSwitch") + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + return switch (connectorBehavior) { + case SUPPORTS_RENAME_SCHEMA -> false; + case SUPPORTS_CREATE_VIEW, SUPPORTS_COMMENT_ON_VIEW, SUPPORTS_COMMENT_ON_VIEW_COLUMN -> false; + case SUPPORTS_CREATE_MATERIALIZED_VIEW, SUPPORTS_RENAME_MATERIALIZED_VIEW -> false; + default -> super.hasBehavior(connectorBehavior); + }; + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + File warehouseLocation = Files.newTemporaryFolder(); + closeAfterClass(() -> deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE)); + + Catalog backend = backendCatalog(warehouseLocation); + + DelegatingRestSessionCatalog delegatingCatalog = DelegatingRestSessionCatalog.builder() + .delegate(backend) + .build(); + + TestingHttpServer testServer = delegatingCatalog.testServer(); + testServer.start(); + closeAfterClass(testServer::stop); + + return IcebergQueryRunner.builder() + .setBaseDataDir(Optional.of(warehouseLocation.toPath())) + .setIcebergProperties( + ImmutableMap.builder() + .put("iceberg.file-format", format.name()) + .put("iceberg.catalog.type", "rest") + .put("iceberg.rest-catalog.uri", testServer.getBaseUrl().toString()) + .buildOrThrow()) + .setInitialTables(REQUIRED_TPCH_TABLES) + .build(); + } + + @Override + public void testView() + { + assertThatThrownBy(super::testView) + .hasMessageContaining("createView is not supported for Iceberg REST catalog"); + } + + @Override + public void testMaterializedView() + { + assertThatThrownBy(super::testMaterializedView) + .hasMessageContaining("createMaterializedView is not supported for Iceberg REST catalog"); + } + + @Override + public void testRenameSchema() + { + assertThatThrownBy(super::testRenameSchema) + .hasMessageContaining("renameNamespace is not supported for Iceberg REST catalog"); + } + + @Override + protected void dropTableFromMetastore(String tableName) + { + // used when registering a table, which is not supported by the REST catalog + } + + @Override + protected String getMetadataLocation(String tableName) + { + // used when registering a table, which is not supported by the REST catalog + throw new UnsupportedOperationException("metadata location for register_table is not supported"); + } + + @Override + public void testRegisterTableWithTableLocation() + { + assertThatThrownBy(super::testRegisterTableWithTableLocation) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Override + public void testRegisterTableWithComments() + { + assertThatThrownBy(super::testRegisterTableWithComments) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Override + public void testRegisterTableWithShowCreateTable() + { + assertThatThrownBy(super::testRegisterTableWithShowCreateTable) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Override + public void testRegisterTableWithReInsert() + { + assertThatThrownBy(super::testRegisterTableWithReInsert) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Override + public void testRegisterTableWithDroppedTable() + { + assertThatThrownBy(super::testRegisterTableWithDroppedTable) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Override + public void testRegisterTableWithDifferentTableName() + { + assertThatThrownBy(super::testRegisterTableWithDifferentTableName) + .hasMessageContaining("register_table procedure is disabled"); + } + + @Override + public void testRegisterTableWithMetadataFile() + { + assertThatThrownBy(super::testRegisterTableWithMetadataFile) + .hasMessageContaining("metadata location for register_table is not supported"); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestOAuth2SecurityConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestOAuth2SecurityConfig.java new file mode 100644 index 000000000000..39c54c7defaa --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestOAuth2SecurityConfig.java @@ -0,0 +1,50 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static org.testng.Assert.assertTrue; + +public class TestOAuth2SecurityConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(OAuth2SecurityConfig.class) + .setCredential(null) + .setToken(null)); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("iceberg.rest-catalog.oauth2.token", "token") + .put("iceberg.rest-catalog.oauth2.credential", "credential") + .buildOrThrow(); + + OAuth2SecurityConfig expected = new OAuth2SecurityConfig() + .setCredential("credential") + .setToken("token"); + assertTrue(expected.credentialOrTokenPresent()); + assertFullMapping(properties, expected); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java new file mode 100644 index 000000000000..5d88bcfa81f7 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java @@ -0,0 +1,110 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.base.CatalogName; +import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.iceberg.CommitTaskData; +import io.trino.plugin.iceberg.IcebergMetadata; +import io.trino.plugin.iceberg.TableStatisticsWriter; +import io.trino.plugin.iceberg.catalog.BaseTrinoCatalogTest; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.security.PrincipalType; +import io.trino.spi.security.TrinoPrincipal; +import org.apache.iceberg.rest.DelegatingRestSessionCatalog; +import org.apache.iceberg.rest.RESTSessionCatalog; +import org.assertj.core.util.Files; + +import java.io.File; + +import static io.airlift.json.JsonCodec.jsonCodec; +import static io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogConfig.SessionType.NONE; +import static io.trino.plugin.iceberg.catalog.rest.RestCatalogTestUtils.backendCatalog; +import static io.trino.sql.planner.TestingPlannerContext.PLANNER_CONTEXT; +import static io.trino.testing.TestingConnectorSession.SESSION; +import static io.trino.testing.sql.TestTable.randomTableSuffix; +import static java.util.Locale.ENGLISH; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestTrinoRestCatalog + extends BaseTrinoCatalogTest +{ + @Override + protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations) + { + File warehouseLocation = Files.newTemporaryFolder(); + warehouseLocation.deleteOnExit(); + + String catalogName = "iceberg_rest"; + RESTSessionCatalog restSessionCatalog = DelegatingRestSessionCatalog + .builder() + .delegate(backendCatalog(warehouseLocation)) + .build(); + + restSessionCatalog.initialize(catalogName, ImmutableMap.of()); + + return new TrinoRestCatalog(restSessionCatalog, new CatalogName(catalogName), NONE, "test", useUniqueTableLocations); + } + + @Override + public void testView() + { + assertThatThrownBy(super::testView) + .hasMessageContaining("createView is not supported for Iceberg REST catalog"); + } + + @Override + public void testNonLowercaseNamespace() + { + TrinoCatalog catalog = createTrinoCatalog(false); + + String namespace = "testNonLowercaseNamespace" + randomTableSuffix(); + String schema = namespace.toLowerCase(ENGLISH); + + catalog.createNamespace(SESSION, namespace, defaultNamespaceProperties(namespace), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); + try { + assertThat(catalog.namespaceExists(SESSION, namespace)).as("catalog.namespaceExists(namespace)") + .isTrue(); + assertThat(catalog.namespaceExists(SESSION, schema)).as("catalog.namespaceExists(schema)") + .isFalse(); + assertThat(catalog.listNamespaces(SESSION)).as("catalog.listNamespaces") + // Catalog listNamespaces may be used as a default implementation for ConnectorMetadata.schemaExists + .doesNotContain(schema) + .contains(namespace); + + // Test with IcebergMetadata, should the ConnectorMetadata implementation behavior depend on that class + ConnectorMetadata icebergMetadata = new IcebergMetadata( + PLANNER_CONTEXT.getTypeManager(), + jsonCodec(CommitTaskData.class), + catalog, + connectorIdentity -> { + throw new UnsupportedOperationException(); + }, + new TableStatisticsWriter(new NodeVersion("test-version"))); + assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)") + .isTrue(); + assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)") + .isFalse(); + assertThat(icebergMetadata.listSchemaNames(SESSION)).as("icebergMetadata.listSchemaNames") + .doesNotContain(schema) + .contains(namespace); + } + finally { + catalog.dropNamespace(SESSION, namespace); + } + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalogConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalogConnectorTest.java new file mode 100644 index 000000000000..c5494c9763bd --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalogConnectorTest.java @@ -0,0 +1,172 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; +import io.airlift.http.server.testing.TestingHttpServer; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.plugin.iceberg.TestIcebergParquetConnectorTest; +import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedRow; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorBehavior; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.rest.DelegatingRestSessionCatalog; +import org.assertj.core.util.Files; + +import java.io.File; +import java.util.Optional; +import java.util.OptionalInt; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.iceberg.catalog.rest.RestCatalogTestUtils.backendCatalog; +import static io.trino.testing.MaterializedResult.DEFAULT_PRECISION; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestTrinoRestCatalogConnectorTest + extends TestIcebergParquetConnectorTest +{ + @SuppressWarnings("DuplicateBranchesInSwitch") + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + return switch (connectorBehavior) { + case SUPPORTS_RENAME_SCHEMA -> false; + case SUPPORTS_CREATE_VIEW, SUPPORTS_COMMENT_ON_VIEW, SUPPORTS_COMMENT_ON_VIEW_COLUMN -> false; + case SUPPORTS_CREATE_MATERIALIZED_VIEW, SUPPORTS_RENAME_MATERIALIZED_VIEW -> false; + default -> super.hasBehavior(connectorBehavior); + }; + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + File warehouseLocation = Files.newTemporaryFolder(); + closeAfterClass(() -> deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE)); + + Catalog backend = backendCatalog(warehouseLocation); + + DelegatingRestSessionCatalog delegatingCatalog = DelegatingRestSessionCatalog.builder() + .delegate(backend) + .build(); + + TestingHttpServer testServer = delegatingCatalog.testServer(); + testServer.start(); + closeAfterClass(testServer::stop); + + return IcebergQueryRunner.builder() + .setBaseDataDir(Optional.of(warehouseLocation.toPath())) + .setIcebergProperties( + ImmutableMap.builder() + .put("iceberg.file-format", format.name()) + .put("iceberg.catalog.type", "rest") + .put("iceberg.rest-catalog.uri", testServer.getBaseUrl().toString()) + .buildOrThrow()) + .setInitialTables(REQUIRED_TPCH_TABLES) + .build(); + } + + @Override + protected OptionalInt maxSchemaNameLength() + { + // h2 test database backend limit + return OptionalInt.of(255); + } + + @Override + protected void verifySchemaNameLengthFailurePermissible(Throwable e) + { + assertThat(e).hasMessageContaining("Failed to execute"); + } + + @Override + protected OptionalInt maxTableNameLength() + { + // This value depends on metastore backend limit + // The connector appends uuids to the end of all table names + // 33 is the length of random suffix. e.g. {table name}-142763c594d54e4b9329a98f90528caf + return OptionalInt.of(255 - 33); + } + + @Override + protected void verifyTableNameLengthFailurePermissible(Throwable e) + { + assertThat(e).hasMessageMatching(".*Failed to create.*|.*Failed to execute.*|.*Failed to rename.*"); + } + + @Override + protected int maxTableRenameLength() + { + // h2 test database backend limit + return 255; + } + + @Override + protected void verifyConcurrentAddColumnFailurePermissible(Exception e) + { + assertThat(e) + .getCause() + .hasMessageContaining("Commit failed: Requirement failed: last assigned field id changed"); + } + + @Override + public void testShowCreateSchema() + { + // Overridden due to REST catalog not supporting namespace principal + assertThat(computeActual("SHOW CREATE SCHEMA tpch").getOnlyValue().toString()) + .matches("\\QCREATE SCHEMA iceberg.tpch\n" + + "WITH (\n" + + " location = '\\E.*\\Q/iceberg_data/tpch'\n" + + ")\\E"); + } + + @Override + protected void verifyIcebergTableProperties(MaterializedResult actual) + { + assertThat(actual) + .anySatisfy(row -> assertThat(row).isEqualTo(new MaterializedRow(DEFAULT_PRECISION, "write.format.default", this.format.name()))) + .anySatisfy(row -> assertThat(row.getFields()).contains("created-at")); + } + + @Override + public void testView() + { + assertThatThrownBy(super::testView) + .hasMessageContaining("createView is not supported for Iceberg REST catalog"); + } + + @Override + public void testMaterializedView() + { + assertThatThrownBy(super::testMaterializedView) + .hasMessageContaining("createMaterializedView is not supported for Iceberg REST catalog"); + } + + @Override + public void testFederatedMaterializedView() + { + assertThatThrownBy(super::testFederatedMaterializedView) + .hasMessageContaining("createMaterializedView is not supported for Iceberg REST catalog"); + } + + @Override + public void testRenameSchema() + { + assertThatThrownBy(super::testRenameSchema) + .hasMessageContaining("renameNamespace is not supported for Iceberg REST catalog"); + } +} diff --git a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/DelegatingRestSessionCatalog.java b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/DelegatingRestSessionCatalog.java new file mode 100644 index 000000000000..b56069650e10 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/DelegatingRestSessionCatalog.java @@ -0,0 +1,95 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.iceberg.rest; + +import com.google.common.collect.ImmutableMap; +import io.airlift.http.server.HttpServerConfig; +import io.airlift.http.server.HttpServerInfo; +import io.airlift.http.server.testing.TestingHttpServer; +import io.airlift.node.NodeInfo; +import org.apache.iceberg.catalog.Catalog; + +import java.io.Closeable; +import java.io.IOException; + +import static java.util.Objects.requireNonNull; + +public class DelegatingRestSessionCatalog + extends RESTSessionCatalog +{ + private RESTCatalogAdapter adapter; + private Catalog delegate; + + // to make sure it is instantiated via Builder + private DelegatingRestSessionCatalog() {} + + DelegatingRestSessionCatalog(RESTCatalogAdapter adapter, Catalog delegate) + { + super(properties -> adapter); + this.adapter = requireNonNull(adapter, "adapter is null"); + this.delegate = requireNonNull(delegate, "delegate catalog is null"); + } + + @Override + public void close() + throws IOException + { + super.close(); + adapter.close(); + + if (delegate instanceof Closeable) { + ((Closeable) delegate).close(); + } + } + + public TestingHttpServer testServer() + throws IOException + { + NodeInfo nodeInfo = new NodeInfo("test"); + HttpServerConfig config = new HttpServerConfig() + .setHttpPort(0) + .setMinThreads(4) + .setMaxThreads(8) + .setHttpAcceptorThreads(4) + .setHttpAcceptQueueSize(10) + .setHttpEnabled(true); + HttpServerInfo httpServerInfo = new HttpServerInfo(config, nodeInfo); + RESTCatalogServlet servlet = new RESTCatalogServlet(adapter); + + return new TestingHttpServer(httpServerInfo, nodeInfo, config, servlet, ImmutableMap.of()); + } + + public static Builder builder() + { + return new Builder(); + } + + public static class Builder + { + private Catalog delegate; + + public Builder delegate(Catalog delegate) + { + this.delegate = delegate; + return this; + } + + public DelegatingRestSessionCatalog build() + { + requireNonNull(delegate, "Delegate must be set"); + + return new DelegatingRestSessionCatalog(new RESTCatalogAdapter(delegate), delegate); + } + } +} diff --git a/pom.xml b/pom.xml index 3d5eb5e95924..84dd96de398e 100644 --- a/pom.xml +++ b/pom.xml @@ -1634,6 +1634,23 @@ + + org.apache.iceberg + iceberg-core + ${dep.iceberg.version} + tests + + + org.apache.avro + avro + + + org.slf4j + slf4j-api + + + + org.apache.iceberg iceberg-hive-metastore diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergRest.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergRest.java new file mode 100644 index 000000000000..aeced013c34f --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergRest.java @@ -0,0 +1,107 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.launcher.env.environment; + +import com.google.common.collect.ImmutableList; +import io.trino.tests.product.launcher.docker.DockerFiles; +import io.trino.tests.product.launcher.env.DockerContainer; +import io.trino.tests.product.launcher.env.Environment; +import io.trino.tests.product.launcher.env.EnvironmentConfig; +import io.trino.tests.product.launcher.env.EnvironmentProvider; +import io.trino.tests.product.launcher.env.common.Hadoop; +import io.trino.tests.product.launcher.env.common.Standard; +import io.trino.tests.product.launcher.env.common.TestsEnvironment; +import io.trino.tests.product.launcher.testcontainers.PortBinder; +import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy; + +import javax.inject.Inject; + +import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts; +import static io.trino.tests.product.launcher.env.EnvironmentContainers.HADOOP; +import static java.util.Objects.requireNonNull; +import static org.testcontainers.utility.MountableFile.forHostPath; + +/** + * Trino with Spark running against a REST server + */ +@TestsEnvironment +public class EnvSinglenodeSparkIcebergRest + extends EnvironmentProvider +{ + private static final int SPARK_THRIFT_PORT = 10213; + private static final int REST_SERVER_PORT = 8181; + private static final String SPARK_CONTAINER_NAME = "spark"; + private static final String REST_CONTAINER_NAME = "iceberg-with-rest"; + private static final String REST_SERVER_IMAGE = "tabulario/iceberg-rest:0.2.0"; + private static final String CATALOG_WAREHOUSE = "hdfs://hadoop-master:9000/user/hive/warehouse"; + + private final DockerFiles dockerFiles; + private final PortBinder portBinder; + private final String hadoopImagesVersion; + + @Inject + public EnvSinglenodeSparkIcebergRest(Standard standard, Hadoop hadoop, DockerFiles dockerFiles, EnvironmentConfig config, PortBinder portBinder) + { + super(ImmutableList.of(standard, hadoop)); + this.dockerFiles = requireNonNull(dockerFiles, "dockerFiles is null"); + this.portBinder = requireNonNull(portBinder, "portBinder is null"); + this.hadoopImagesVersion = requireNonNull(config, "config is null").getHadoopImagesVersion(); + } + + @Override + public void extendEnvironment(Environment.Builder builder) + { + builder.addContainer(createRESTContainer()); + builder.addConnector("iceberg", forHostPath(dockerFiles.getDockerFilesHostPath( + "conf/environment/singlenode-spark-iceberg-rest/iceberg.properties"))); + builder.addContainer(createSparkContainer()).containerDependsOn(SPARK_CONTAINER_NAME, HADOOP); + } + + @SuppressWarnings("resource") + private DockerContainer createRESTContainer() + { + DockerContainer container = new DockerContainer(REST_SERVER_IMAGE, REST_CONTAINER_NAME) + .withEnv("CATALOG_WAREHOUSE", CATALOG_WAREHOUSE) + .withEnv("REST_PORT", Integer.toString(REST_SERVER_PORT)) + .withEnv("HADOOP_USER_NAME", "hive") + .withStartupCheckStrategy(new IsRunningStartupCheckStrategy()) + .waitingFor(forSelectedPorts(REST_SERVER_PORT)); + + portBinder.exposePort(container, REST_SERVER_PORT); + return container; + } + + @SuppressWarnings("resource") + private DockerContainer createSparkContainer() + { + DockerContainer container = new DockerContainer("ghcr.io/trinodb/testing/spark3-iceberg:" + hadoopImagesVersion, SPARK_CONTAINER_NAME) + .withEnv("HADOOP_USER_NAME", "hive") + .withCopyFileToContainer( + forHostPath(dockerFiles.getDockerFilesHostPath( + "conf/environment/singlenode-spark-iceberg-rest/spark-defaults.conf")), + "/spark/conf/spark-defaults.conf") + .withCommand( + "spark-submit", + "--master", "local[*]", + "--class", "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2", + "--name", "Thrift JDBC/ODBC Server", + "--conf", "spark.hive.server2.thrift.port=" + SPARK_THRIFT_PORT, + "spark-internal") + .withStartupCheckStrategy(new IsRunningStartupCheckStrategy()) + .waitingFor(forSelectedPorts(SPARK_THRIFT_PORT)); + + portBinder.exposePort(container, SPARK_THRIFT_PORT); + return container; + } +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteIceberg.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteIceberg.java index 3e08dc93fd57..9037cefcb2d7 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteIceberg.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteIceberg.java @@ -18,6 +18,7 @@ import io.trino.tests.product.launcher.env.EnvironmentDefaults; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeHiveIcebergRedirections; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSparkIceberg; +import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSparkIcebergRest; import io.trino.tests.product.launcher.suite.Suite; import io.trino.tests.product.launcher.suite.SuiteTestRun; @@ -41,6 +42,9 @@ public List getTestRuns(EnvironmentConfig config) .build(), testOnEnvironment(EnvSinglenodeHiveIcebergRedirections.class) .withGroups("configured_features", "hive_iceberg_redirections") + .build(), + testOnEnvironment(EnvSinglenodeSparkIcebergRest.class) + .withGroups("configured_features", "iceberg_rest") .build()); } } diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-rest/iceberg.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-rest/iceberg.properties new file mode 100644 index 000000000000..9815b34dfee4 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-rest/iceberg.properties @@ -0,0 +1,3 @@ +connector.name=iceberg +iceberg.catalog.type=rest +iceberg.rest-catalog.uri=http://iceberg-with-rest:8181/ diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-rest/spark-defaults.conf b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-rest/spark-defaults.conf new file mode 100644 index 000000000000..be92ee99931e --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-rest/spark-defaults.conf @@ -0,0 +1,9 @@ +spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions +spark.sql.defaultCatalog=iceberg_test +spark.sql.catalog.iceberg_test=org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.iceberg_test.catalog-impl=org.apache.iceberg.rest.RESTCatalog +spark.sql.catalog.iceberg_test.uri=http://iceberg-with-rest:8181/ +; disabling caching allows us to run spark queries interchangeably with trino's +spark.sql.catalog.iceberg_test.cache-enabled=false +spark.sql.catalog.iceberg_test.warehouse=hdfs://hadoop-master:9000/user/hive/warehouse +spark.hadoop.fs.defaultFS=hdfs://hadoop-master:9000 diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java index 824571aae0e0..b56aac6001fe 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java @@ -69,6 +69,7 @@ public final class TestGroups public static final String TWO_HIVES = "two_hives"; public static final String ICEBERG = "iceberg"; public static final String ICEBERG_FORMAT_VERSION_COMPATIBILITY = "iceberg_format_version_compatibility"; + public static final String ICEBERG_REST = "iceberg_rest"; public static final String AVRO = "avro"; public static final String PHOENIX = "phoenix"; public static final String CLICKHOUSE = "clickhouse"; diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index 85c6259c1133..3be74d340ca9 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -62,6 +62,7 @@ import static io.trino.tempto.assertions.QueryAssert.assertThat; import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.tests.product.TestGroups.ICEBERG; +import static io.trino.tests.product.TestGroups.ICEBERG_REST; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.iceberg.TestIcebergSparkCompatibility.CreateMode.CREATE_TABLE_AND_INSERT; import static io.trino.tests.product.iceberg.TestIcebergSparkCompatibility.CreateMode.CREATE_TABLE_AS_SELECT; @@ -111,10 +112,17 @@ public void tearDown() private static final String TRINO_CATALOG = "iceberg"; private static final String TEST_SCHEMA_NAME = "default"; - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @BeforeTestWithContext + public void setUp() + { + // we create default schema so that we can re-use the same test for the Iceberg REST catalog (since Iceberg itself doesn't support a default schema) + onTrino().executeQuery(format("CREATE SCHEMA IF NOT EXISTS %s.%s", TRINO_CATALOG, TEST_SCHEMA_NAME)); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadingSparkData(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_trino_reading_primitive_types_" + storageFormat; + String baseTableName = toLowerCase("test_trino_reading_primitive_types_" + storageFormat); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -208,10 +216,10 @@ public void testTrinoReadingSparkData(StorageFormat storageFormat, int specVersi onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "testSparkReadingTrinoDataDataProvider") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "testSparkReadingTrinoDataDataProvider") public void testSparkReadingTrinoData(StorageFormat storageFormat, CreateMode createMode) { - String baseTableName = "test_spark_reading_primitive_types_" + storageFormat + "_" + createMode; + String baseTableName = toLowerCase("test_spark_reading_primitive_types_" + storageFormat + "_" + createMode); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -334,10 +342,10 @@ public Object[][] testSparkReadingTrinoDataDataProvider() .toArray(Object[][]::new); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormats") public void testSparkReadTrinoUuid(StorageFormat storageFormat) { - String tableName = "test_spark_read_trino_uuid_" + storageFormat; + String tableName = toLowerCase("test_spark_read_trino_uuid_" + storageFormat); String trinoTableName = trinoTableName(tableName); String sparkTableName = sparkTableName(tableName); @@ -354,7 +362,7 @@ public void testSparkReadTrinoUuid(StorageFormat storageFormat) onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "specVersions") public void testSparkCreatesTrinoDrops(int specVersion) { String baseTableName = "test_spark_creates_trino_drops"; @@ -362,7 +370,7 @@ public void testSparkCreatesTrinoDrops(int specVersion) onTrino().executeQuery("DROP TABLE " + trinoTableName(baseTableName)); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) public void testTrinoCreatesSparkDrops() { String baseTableName = "test_trino_creates_spark_drops"; @@ -370,10 +378,10 @@ public void testTrinoCreatesSparkDrops() onSpark().executeQuery("DROP TABLE " + sparkTableName(baseTableName)); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormats") public void testSparkReadsTrinoPartitionedTable(StorageFormat storageFormat) { - String baseTableName = "test_spark_reads_trino_partitioned_table_" + storageFormat; + String baseTableName = toLowerCase("test_spark_reads_trino_partitioned_table_" + storageFormat); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); @@ -398,10 +406,10 @@ public void testSparkReadsTrinoPartitionedTable(StorageFormat storageFormat) onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadsSparkPartitionedTable(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_trino_reads_spark_partitioned_table_" + storageFormat; + String baseTableName = toLowerCase("test_trino_reads_spark_partitioned_table_" + storageFormat); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName); @@ -460,10 +468,10 @@ public void testPartitionedByNestedFiled() onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadingCompositeSparkData(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_trino_reading_spark_composites_" + storageFormat; + String baseTableName = toLowerCase("test_trino_reading_spark_composites_" + storageFormat); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -488,10 +496,10 @@ public void testTrinoReadingCompositeSparkData(StorageFormat storageFormat, int onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormats") public void testSparkReadingCompositeTrinoData(StorageFormat storageFormat) { - String baseTableName = "test_spark_reading_trino_composites_" + storageFormat; + String baseTableName = toLowerCase("test_spark_reading_trino_composites_" + storageFormat); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -515,10 +523,10 @@ public void testSparkReadingCompositeTrinoData(StorageFormat storageFormat) onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadingSparkIcebergTablePropertiesData(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_trino_reading_spark_iceberg_table_properties_" + storageFormat; + String baseTableName = toLowerCase("test_trino_reading_spark_iceberg_table_properties_" + storageFormat); String propertiesTableName = "\"" + baseTableName + "$properties\""; String sparkTableName = sparkTableName(baseTableName); String trinoPropertiesTableName = trinoTableName(propertiesTableName); @@ -543,10 +551,10 @@ public void testTrinoReadingSparkIcebergTablePropertiesData(StorageFormat storag onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadingNestedSparkData(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_trino_reading_nested_spark_data_" + storageFormat; + String baseTableName = toLowerCase("test_trino_reading_nested_spark_data_" + storageFormat); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -600,10 +608,10 @@ public void testTrinoReadingNestedSparkData(StorageFormat storageFormat, int spe onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormats") public void testSparkReadingNestedTrinoData(StorageFormat storageFormat) { - String baseTableName = "test_spark_reading_nested_trino_data_" + storageFormat; + String baseTableName = toLowerCase("test_spark_reading_nested_trino_data_" + storageFormat); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -657,10 +665,10 @@ public void testSparkReadingNestedTrinoData(StorageFormat storageFormat) onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") public void testIdBasedFieldMapping(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_schema_evolution_for_nested_fields_" + storageFormat; + String baseTableName = toLowerCase("test_schema_evolution_for_nested_fields_" + storageFormat); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -775,10 +783,10 @@ public void testIdBasedFieldMapping(StorageFormat storageFormat, int specVersion onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") public void testReadAfterPartitionEvolution(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_read_after_partition_evolution_" + storageFormat; + String baseTableName = toLowerCase("test_read_after_partition_evolution_" + storageFormat); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -869,7 +877,7 @@ public void testTrinoShowingSparkCreatedTables(int specVersion) onTrino().executeQuery("DROP TABLE " + trinoTableName(trinoTable)); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "specVersions") public void testCreateAndDropTableWithSameLocationWorksOnSpark(int specVersion) { String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_create_table_same_location/obj-data"; @@ -911,7 +919,7 @@ public void testCreateAndDropTableWithSameLocationFailsOnTrino(int specVersion) @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_object_storage_location_provider_" + storageFormat; + String baseTableName = toLowerCase("test_object_storage_location_provider_" + storageFormat); String sparkTableName = sparkTableName(baseTableName); String trinoTableName = trinoTableName(baseTableName); String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_object_storage_location_provider/obj-data"; @@ -941,7 +949,7 @@ public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoWritingDataWithWriterDataPathSet(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_writer_data_path_" + storageFormat; + String baseTableName = toLowerCase("test_writer_data_path_" + storageFormat); String sparkTableName = sparkTableName(baseTableName); String trinoTableName = trinoTableName(baseTableName); String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_writer_data_path_/obj-data"; @@ -1005,7 +1013,7 @@ public void testTrinoWritingDataWithWriterDataPathSet(StorageFormat storageForma Streams.mapWithIndex(SPECIAL_CHARACTER_VALUES.stream(), ((value, index) -> row((int) index, value))) .collect(toImmutableList()); - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) public void testStringPartitioningWithSpecialCharactersCtasInTrino() { String baseTableName = "test_string_partitioning_with_special_chars_ctas_in_trino"; @@ -1023,7 +1031,7 @@ public void testStringPartitioningWithSpecialCharactersCtasInTrino() onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) public void testStringPartitioningWithSpecialCharactersInsertInTrino() { String baseTableName = "test_string_partitioning_with_special_chars_ctas_in_trino"; @@ -1039,7 +1047,7 @@ public void testStringPartitioningWithSpecialCharactersInsertInTrino() onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) public void testStringPartitioningWithSpecialCharactersInsertInSpark() { String baseTableName = "test_string_partitioning_with_special_chars_ctas_in_spark"; @@ -1073,7 +1081,7 @@ public void testPartitioningWithMixedCaseColumnUnsupportedInTrino() onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) public void testInsertReadingFromParquetTableWithNestedRowFieldNotPresentInDataFile() { // regression test for https://github.com/trinodb/trino/issues/9264 @@ -1157,7 +1165,7 @@ private void assertSelectsOnSpecialCharacters(String trinoTableName, String spar /** * @see TestIcebergInsert#testIcebergConcurrentInsert() */ - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, timeOut = 60_000) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, timeOut = 60_000) public void testTrinoSparkConcurrentInsert() throws Exception { @@ -1224,13 +1232,13 @@ public void testTrinoSparkConcurrentInsert() } } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsAndCompressionCodecs") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsAndCompressionCodecs") public void testTrinoReadingSparkCompressedData(StorageFormat storageFormat, String compressionCodec) { - String baseTableName = "test_spark_compression" + + String baseTableName = toLowerCase("test_spark_compression" + "_" + storageFormat + "_" + compressionCodec + - "_" + randomNameSuffix(); + "_" + randomNameSuffix()); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -1289,13 +1297,13 @@ else if ("ZSTD".equals(compressionCodec)) { onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsAndCompressionCodecs") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsAndCompressionCodecs") public void testSparkReadingTrinoCompressedData(StorageFormat storageFormat, String compressionCodec) { - String baseTableName = "test_trino_compression" + + String baseTableName = toLowerCase("test_trino_compression" + "_" + storageFormat + "_" + compressionCodec + - "_" + randomNameSuffix(); + "_" + randomNameSuffix()); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -1489,7 +1497,7 @@ public void testMigratedDataWithPartialNameMapping(StorageFormat storageFormat) .containsOnly(row(1, null)); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) public void testPartialStats() { String tableName = "test_partial_stats_" + randomNameSuffix(); @@ -1519,7 +1527,7 @@ public void testPartialStats() onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) public void testStatsAfterAddingPartitionField() { String tableName = "test_stats_after_adding_partition_field_" + randomNameSuffix(); @@ -1545,10 +1553,10 @@ public void testStatsAfterAddingPartitionField() onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "tableFormatWithDeleteFormat") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "tableFormatWithDeleteFormat") public void testTrinoReadsSparkRowLevelDeletes(StorageFormat tableStorageFormat, StorageFormat deleteFileStorageFormat) { - String tableName = format("test_trino_reads_spark_row_level_deletes_%s_%s_%s", tableStorageFormat.name(), deleteFileStorageFormat.name(), randomNameSuffix()); + String tableName = toLowerCase(format("test_trino_reads_spark_row_level_deletes_%s_%s_%s", tableStorageFormat.name(), deleteFileStorageFormat.name(), randomNameSuffix())); String sparkTableName = sparkTableName(tableName); String trinoTableName = trinoTableName(tableName); @@ -1578,10 +1586,10 @@ public void testTrinoReadsSparkRowLevelDeletes(StorageFormat tableStorageFormat, onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "tableFormatWithDeleteFormat") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "tableFormatWithDeleteFormat") public void testTrinoReadsSparkRowLevelDeletesWithRowTypes(StorageFormat tableStorageFormat, StorageFormat deleteFileStorageFormat) { - String tableName = format("test_trino_reads_spark_row_level_deletes_row_types_%s_%s_%s", tableStorageFormat.name(), deleteFileStorageFormat.name(), randomNameSuffix()); + String tableName = toLowerCase(format("test_trino_reads_spark_row_level_deletes_row_types_%s_%s_%s", tableStorageFormat.name(), deleteFileStorageFormat.name(), randomNameSuffix())); String sparkTableName = sparkTableName(tableName); String trinoTableName = trinoTableName(tableName); @@ -1603,10 +1611,10 @@ public void testTrinoReadsSparkRowLevelDeletesWithRowTypes(StorageFormat tableSt onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormats") public void testSparkReadsTrinoRowLevelDeletes(StorageFormat storageFormat) { - String tableName = format("test_spark_reads_trino_row_level_deletes_%s_%s", storageFormat.name(), randomNameSuffix()); + String tableName = toLowerCase(format("test_spark_reads_trino_row_level_deletes_%s_%s", storageFormat.name(), randomNameSuffix())); String sparkTableName = sparkTableName(tableName); String trinoTableName = trinoTableName(tableName); @@ -1630,10 +1638,10 @@ public void testSparkReadsTrinoRowLevelDeletes(StorageFormat storageFormat) onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormats") public void testSparkReadsTrinoRowLevelDeletesWithRowTypes(StorageFormat storageFormat) { - String tableName = format("test_spark_reads_trino_row_level_deletes_row_types_%s_%s", storageFormat.name(), randomNameSuffix()); + String tableName = toLowerCase(format("test_spark_reads_trino_row_level_deletes_row_types_%s_%s", storageFormat.name(), randomNameSuffix())); String sparkTableName = sparkTableName(tableName); String trinoTableName = trinoTableName(tableName); @@ -1649,10 +1657,10 @@ public void testSparkReadsTrinoRowLevelDeletesWithRowTypes(StorageFormat storage onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormats") public void testDeleteAfterPartitionEvolution(StorageFormat storageFormat) { - String baseTableName = "test_delete_after_partition_evolution_" + storageFormat + randomNameSuffix(); + String baseTableName = toLowerCase("test_delete_after_partition_evolution_" + storageFormat + randomNameSuffix()); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); @@ -1702,7 +1710,7 @@ public void testDeleteAfterPartitionEvolution(StorageFormat storageFormat) onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) public void testMissingMetrics() { String tableName = "test_missing_metrics_" + randomNameSuffix(); @@ -1716,7 +1724,7 @@ public void testMissingMetrics() onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) public void testOptimizeOnV2IcebergTable() { String tableName = format("test_optimize_on_v2_iceberg_table_%s", randomNameSuffix()); @@ -1835,10 +1843,10 @@ public enum CreateMode CREATE_TABLE_WITH_NO_DATA_AND_INSERT, } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormats") public void testSparkReadsTrinoTableAfterCleaningUp(StorageFormat storageFormat) { - String baseTableName = "test_spark_reads_trino_partitioned_table_after_expiring_snapshots" + storageFormat; + String baseTableName = toLowerCase("test_spark_reads_trino_partitioned_table_after_expiring_snapshots" + storageFormat); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); @@ -1875,10 +1883,10 @@ public void testSparkReadsTrinoTableAfterCleaningUp(StorageFormat storageFormat) onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") public void testSparkReadsTrinoTableAfterOptimizeAndCleaningUp(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_spark_reads_trino_partitioned_table_after_expiring_snapshots_after_optimize" + storageFormat; + String baseTableName = toLowerCase("test_spark_reads_trino_partitioned_table_after_expiring_snapshots_after_optimize" + storageFormat); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); @@ -1919,10 +1927,10 @@ public void testSparkReadsTrinoTableAfterOptimizeAndCleaningUp(StorageFormat sto onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoReadsTrinoTableWithSparkDeletesAfterOptimizeAndCleanUp(StorageFormat storageFormat, int specVersion) { - String baseTableName = "test_spark_reads_trino_partitioned_table_with_deletes_after_expiring_snapshots_after_optimize" + storageFormat; + String baseTableName = toLowerCase("test_spark_reads_trino_partitioned_table_with_deletes_after_expiring_snapshots_after_optimize" + storageFormat); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); @@ -1951,10 +1959,10 @@ public void testTrinoReadsTrinoTableWithSparkDeletesAfterOptimizeAndCleanUp(Stor onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "tableFormatWithDeleteFormat") + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}, dataProvider = "tableFormatWithDeleteFormat") public void testCleaningUpIcebergTableWithRowLevelDeletes(StorageFormat tableStorageFormat, StorageFormat deleteFileStorageFormat) { - String baseTableName = "test_cleaning_up_iceberg_table_fails_for_table_v2" + tableStorageFormat; + String baseTableName = toLowerCase("test_cleaning_up_iceberg_table_fails_for_table_v2" + tableStorageFormat); String trinoTableName = trinoTableName(baseTableName); String sparkTableName = sparkTableName(baseTableName); onTrino().executeQuery("DROP TABLE IF EXISTS " + trinoTableName); @@ -1988,7 +1996,7 @@ public void testCleaningUpIcebergTableWithRowLevelDeletes(StorageFormat tableSto .containsOnly(row); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) public void testUpdateAfterSchemaEvolution() { String baseTableName = "test_update_after_schema_evolution_" + randomNameSuffix(); @@ -2030,7 +2038,7 @@ public void testUpdateAfterSchemaEvolution() onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) public void testUpdateOnPartitionColumn() { String baseTableName = "test_update_on_partition_column" + randomNameSuffix(); @@ -2063,7 +2071,7 @@ public void testUpdateOnPartitionColumn() onSpark().executeQuery("DROP TABLE " + sparkTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) public void testAddNotNullColumn() { String baseTableName = "test_add_not_null_column_" + randomNameSuffix(); @@ -2082,7 +2090,7 @@ public void testAddNotNullColumn() onTrino().executeQuery("DROP TABLE " + trinoTableName); } - @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}) + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST}) public void testHandlingPartitionSchemaEvolutionInPartitionMetadata() { String baseTableName = "test_handling_partition_schema_evolution_" + randomNameSuffix(); @@ -2477,4 +2485,9 @@ private String getColumnComment(String tableName, String columnName) { return (String) onTrino().executeQuery("SELECT comment FROM " + TRINO_CATALOG + ".information_schema.columns WHERE table_schema = '" + TEST_SCHEMA_NAME + "' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'").getOnlyValue(); } + + private static String toLowerCase(String name) + { + return name.toLowerCase(ENGLISH); + } }