From 3b6b818ae3eb46545f2b829e566cb580571a444a Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Thu, 1 Aug 2024 08:59:34 +0900 Subject: [PATCH 1/2] Update docker image version to 100 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index b8ce5081e093..fd03468e44e2 100644 --- a/pom.xml +++ b/pom.xml @@ -189,7 +189,7 @@ 1.12.765 4.17.0 7.5.1 - 99 + 100 1.22 2.29.2 10.17.0 From d291a0224c2e47478a0c7a6b7b5c233d805e758e Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Thu, 1 Aug 2024 10:07:31 +0900 Subject: [PATCH 2/2] Add support for Polaris catalog in Iceberg --- plugin/trino-iceberg/pom.xml | 6 + .../plugin/iceberg/IcebergQueryRunner.java | 32 +++ ...ebergPolarisCatalogConnectorSmokeTest.java | 234 ++++++++++++++++++ .../catalog/rest/TestingPolarisCatalog.java | 132 ++++++++++ 4 files changed, 404 insertions(+) create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogConnectorSmokeTest.java create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestingPolarisCatalog.java diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index e30d383a4ec1..031fbe4a1fde 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -324,6 +324,12 @@ runtime + + io.airlift + http-client + runtime + + io.airlift log-manager diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java index 4e4a63768328..49814e82f1cf 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java @@ -24,6 +24,7 @@ import io.trino.plugin.hive.containers.HiveHadoop; import io.trino.plugin.hive.containers.HiveMinioDataLake; import io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer; +import io.trino.plugin.iceberg.catalog.rest.TestingPolarisCatalog; import io.trino.plugin.tpch.TpchPlugin; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.QueryRunner; @@ -196,6 +197,37 @@ public static void main(String[] args) } } + public static final class IcebergPolarisQueryRunnerMain + { + private IcebergPolarisQueryRunnerMain() {} + + public static void main(String[] args) + throws Exception + { + File warehouseLocation = Files.newTemporaryFolder(); + warehouseLocation.deleteOnExit(); + + @SuppressWarnings("resource") + TestingPolarisCatalog polarisCatalog = new TestingPolarisCatalog(warehouseLocation.getPath()); + + @SuppressWarnings("resource") + QueryRunner queryRunner = IcebergQueryRunner.builder() + .addCoordinatorProperty("http-server.http.port", "8080") + .setBaseDataDir(Optional.of(warehouseLocation.toPath())) + .addIcebergProperty("iceberg.catalog.type", "rest") + .addIcebergProperty("iceberg.rest-catalog.uri", polarisCatalog.restUri() + "/api/catalog") + .addIcebergProperty("iceberg.rest-catalog.warehouse", TestingPolarisCatalog.WAREHOUSE) + .addIcebergProperty("iceberg.rest-catalog.security", "OAUTH2") + .addIcebergProperty("iceberg.rest-catalog.oauth2.token", polarisCatalog.oauth2Token()) + .setInitialTables(TpchTable.getTables()) + .build(); + + Logger log = Logger.get(IcebergPolarisQueryRunnerMain.class); + log.info("======== SERVER STARTED ========"); + log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl()); + } + } + public static final class IcebergGlueQueryRunnerMain { private IcebergGlueQueryRunnerMain() {} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogConnectorSmokeTest.java new file mode 100644 index 000000000000..56cf9dcccac2 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogConnectorSmokeTest.java @@ -0,0 +1,234 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import io.trino.filesystem.Location; +import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.IcebergConnector; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.spi.connector.SchemaTableName; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorBehavior; +import org.apache.iceberg.BaseTable; +import org.assertj.core.util.Files; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.parallel.Isolated; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.util.Optional; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting; +import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting; +import static java.lang.String.format; +import static org.apache.iceberg.FileFormat.PARQUET; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assumptions.abort; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@Isolated // TODO remove +@TestInstance(PER_CLASS) +final class TestIcebergPolarisCatalogConnectorSmokeTest + extends BaseIcebergConnectorSmokeTest +{ + private TestingPolarisCatalog polarisCatalog; + private File warehouseLocation; + + public TestIcebergPolarisCatalogConnectorSmokeTest() + { + super(new IcebergConfig().getFileFormat().toIceberg()); + } + + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + return switch (connectorBehavior) { + case SUPPORTS_CREATE_MATERIALIZED_VIEW, + SUPPORTS_RENAME_SCHEMA -> false; + default -> super.hasBehavior(connectorBehavior); + }; + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + warehouseLocation = Files.newTemporaryFolder(); + polarisCatalog = closeAfterClass(new TestingPolarisCatalog(warehouseLocation.getPath())); + + return IcebergQueryRunner.builder() + .setBaseDataDir(Optional.of(warehouseLocation.toPath())) + .addIcebergProperty("iceberg.file-format", format.name()) + .addIcebergProperty("iceberg.register-table-procedure.enabled", "true") + .addIcebergProperty("iceberg.writer-sort-buffer-size", "1MB") + .addIcebergProperty("iceberg.catalog.type", "rest") + .addIcebergProperty("iceberg.rest-catalog.uri", polarisCatalog.restUri() + "/api/catalog") + .addIcebergProperty("iceberg.rest-catalog.warehouse", TestingPolarisCatalog.WAREHOUSE) + .addIcebergProperty("iceberg.rest-catalog.security", "OAUTH2") + .addIcebergProperty("iceberg.rest-catalog.oauth2.token", polarisCatalog.oauth2Token()) + .setInitialTables(REQUIRED_TPCH_TABLES) + .build(); + } + + @Override + protected void dropTableFromMetastore(String tableName) + { + polarisCatalog.dropTable(getSession().getSchema().orElseThrow(), tableName); + } + + @Override + protected String getMetadataLocation(String tableName) + { + TrinoCatalogFactory catalogFactory = ((IcebergConnector) getQueryRunner().getCoordinator().getConnector("iceberg")).getInjector().getInstance(TrinoCatalogFactory.class); + TrinoCatalog trinoCatalog = catalogFactory.create(getSession().getIdentity().toConnectorIdentity()); + BaseTable table = (BaseTable) trinoCatalog.loadTable(getSession().toConnectorSession(), new SchemaTableName(getSession().getSchema().orElseThrow(), tableName)); + return table.operations().current().metadataFileLocation(); + } + + @Override + protected String getTableLocation(String tableName) + { + TrinoCatalogFactory catalogFactory = ((IcebergConnector) getQueryRunner().getCoordinator().getConnector("iceberg")).getInjector().getInstance(TrinoCatalogFactory.class); + TrinoCatalog trinoCatalog = catalogFactory.create(getSession().getIdentity().toConnectorIdentity()); + BaseTable table = (BaseTable) trinoCatalog.loadTable(getSession().toConnectorSession(), new SchemaTableName(getSession().getSchema().orElseThrow(), tableName)); + return table.operations().current().location(); + } + + @Override + protected String schemaPath() + { + return format("file://%s/%s", warehouseLocation, getSession().getSchema().orElseThrow()); + } + + @Override + protected boolean locationExists(String location) + { + return java.nio.file.Files.exists(Path.of(location)); + } + + @Override + protected boolean isFileSorted(Location path, String sortColumnName) + { + if (format == PARQUET) { + return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); + } + return checkOrcFileSorting(fileSystem, path, sortColumnName); + } + + @Override + protected void deleteDirectory(String location) + { + try { + deleteRecursively(Path.of(location.replaceAll("^file://", "")), ALLOW_INSECURE); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Test + @Override + public void testMaterializedView() + { + assertThatThrownBy(super::testMaterializedView) + .hasMessageContaining("createMaterializedView is not supported for Iceberg REST catalog"); + } + + @Test + @Override + public void testRenameSchema() + { + assertThatThrownBy(super::testRenameSchema) + .hasMessageContaining("renameNamespace is not supported for Iceberg REST catalog"); + } + + @Test + @Override + public void testRegisterTableWithDroppedTable() + { + assertThatThrownBy(super::testRegisterTableWithDroppedTable) + .hasStackTraceContaining("Failed to open input stream for file"); + } + + @Test + @Override + public void testDropTableWithMissingDataFile() + { + assertThatThrownBy(super::testDropTableWithMissingDataFile) + .hasMessageContaining("Expecting value to be false but was true"); + } + + @Test + @Override + public void testRegisterTableWithTrailingSpaceInLocation() + { + assertThatThrownBy(super::testRegisterTableWithTrailingSpaceInLocation) + .hasStackTraceContaining("Illegal character in path"); + } + + @Test + @Override + public void testCreateTableWithTrailingSpaceInLocation() + { + assertThatThrownBy(super::testCreateTableWithTrailingSpaceInLocation) + .hasStackTraceContaining("Illegal character in path"); + } + + @Test + @Override + public void testDropTableWithMissingMetadataFile() + { + assertThatThrownBy(super::testDropTableWithMissingMetadataFile) + .hasMessageMatching(".* Table '.*' does not exist"); + } + + @Test + @Override + public void testDropTableWithMissingSnapshotFile() + { + assertThatThrownBy(super::testDropTableWithMissingSnapshotFile) + .hasStackTraceContaining("Expecting value to be false but was true"); + } + + @Test + @Override + public void testDropTableWithMissingManifestListFile() + { + assertThatThrownBy(super::testDropTableWithMissingManifestListFile) + .hasMessageContaining("Table location should not exist"); + } + + @Test + @Override + public void testDropTableWithNonExistentTableLocation() + { + assertThatThrownBy(super::testDropTableWithNonExistentTableLocation) + .hasMessageMatching(".* Table '.*' does not exist"); + } + + @Test + @Override + public void testDeleteRowsConcurrently() + { + abort("Skipped for now due to flakiness"); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestingPolarisCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestingPolarisCatalog.java new file mode 100644 index 000000000000..e546addb2774 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestingPolarisCatalog.java @@ -0,0 +1,132 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import io.airlift.http.client.HttpClient; +import io.airlift.http.client.Request; +import io.airlift.http.client.jetty.JettyHttpClient; +import org.intellij.lang.annotations.Language; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; + +import java.io.Closeable; +import java.net.URI; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Stream; + +import static com.google.common.collect.MoreCollectors.onlyElement; +import static io.airlift.http.client.StaticBodyGenerator.createStaticBodyGenerator; +import static io.airlift.http.client.StatusResponseHandler.createStatusResponseHandler; +import static io.trino.testing.TestingProperties.getDockerImagesVersion; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; + +public final class TestingPolarisCatalog + implements Closeable +{ + public static final String WAREHOUSE = "polaris"; + private static final int POLARIS_PORT = 8181; + private static final Pattern PATTERN = Pattern.compile("realm: default-realm root principal credentials: [a-f0-9]+:(?[a-f0-9]+)"); + + private static final HttpClient HTTP_CLIENT = new JettyHttpClient(); + + private final GenericContainer polarisCatalog; + private final String clientSecret; + private final String warehouseLocation; + + public TestingPolarisCatalog(String warehouseLocation) + { + this.warehouseLocation = requireNonNull(warehouseLocation, "warehouseLocation is null"); + + // TODO: Use the official docker image once Polaris community provides it + polarisCatalog = new GenericContainer<>("ghcr.io/trinodb/testing/polaris-catalog:" + getDockerImagesVersion()); + polarisCatalog.addExposedPort(POLARIS_PORT); + polarisCatalog.withFileSystemBind(warehouseLocation, warehouseLocation, BindMode.READ_WRITE); + polarisCatalog.waitingFor(new LogMessageWaitStrategy().withRegEx(".*o.eclipse.jetty.server.Server: Started.*")); + polarisCatalog.start(); + + clientSecret = findClientSecret(); + createCatalog(); + grantPrivilege(); + } + + private String findClientSecret() + { + return Stream.of(polarisCatalog.getLogs().split("\n")) + .map(PATTERN::matcher) + .filter(Matcher::find) + .map(matcher -> matcher.group("secret")) + .collect(onlyElement()); + } + + private void createCatalog() + { + @Language("JSON") + String body = "{" + + "\"name\": \"polaris\"," + + "\"id\": 1," + + "\"type\": \"INTERNAL\"," + + "\"readOnly\": false, " + + "\"storageConfigInfo\": {\"storageType\": \"FILE\"}, \"properties\": {\"default-base-location\": \"file://" + warehouseLocation + "\"}" + + "}"; + Request request = Request.Builder.preparePost() + .setUri(URI.create(restUri() + "/api/management/v1/catalogs")) + .setHeader("Authorization", "Bearer " + oauth2Token()) + .setHeader("Content-Type", "application/json") + .setBodyGenerator(createStaticBodyGenerator(body, UTF_8)) + .build(); + HTTP_CLIENT.execute(request, createStatusResponseHandler()); + } + + private void grantPrivilege() + { + @Language("JSON") + String body = "{\"grant\": {\"type\": \"catalog\", \"privilege\": \"TABLE_WRITE_DATA\"}}"; + Request request = Request.Builder.preparePut() + .setUri(URI.create(restUri() + "/api/management/v1/catalogs/polaris/catalog-roles/catalog_admin/grants")) + .setHeader("Authorization", "Bearer " + oauth2Token()) + .setHeader("Content-Type", "application/json") + .setBodyGenerator(createStaticBodyGenerator(body, UTF_8)) + .build(); + HTTP_CLIENT.execute(request, createStatusResponseHandler()); + } + + public void dropTable(String schema, String table) + { + Request request = Request.Builder.prepareDelete() + .setUri(URI.create(restUri() + "/api/catalog/v1/polaris/namespaces/" + schema + "/tables/" + table)) + .setHeader("Authorization", "Bearer " + oauth2Token()) + .setHeader("Content-Type", "application/json") + .build(); + HTTP_CLIENT.execute(request, createStatusResponseHandler()); + } + + public String restUri() + { + return "http://%s:%s".formatted(polarisCatalog.getHost(), polarisCatalog.getMappedPort(POLARIS_PORT)); + } + + public String oauth2Token() + { + return "principal:root;password:%s;realm:default-realm;role:ALL".formatted(clientSecret); + } + + @Override + public void close() + { + polarisCatalog.close(); + } +}