diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index fc8617db63c5..b8abb23e421c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -76,6 +76,7 @@ import io.trino.spi.connector.ProjectionApplicationResult; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.RowChangeParadigm; +import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; import io.trino.spi.connector.SystemTable; @@ -700,6 +701,10 @@ public Optional getNewTableLayout(ConnectorSession session public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode) { verify(transaction == null, "transaction already set"); + String schemaName = tableMetadata.getTable().getSchemaName(); + if (!schemaExists(session, schemaName)) { + throw new SchemaNotFoundException(schemaName); + } transaction = newCreateTableTransaction(catalog, tableMetadata, session); String location = transaction.table().location(); TrinoFileSystem fileSystem = fileSystemFactory.create(session); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java index 447e2706bbaf..0667c161d743 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java @@ -395,6 +395,25 @@ public void testUnregisterTableAccessControl() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testCreateTableWithNonExistingSchemaVerifyLocation() + { + String schemaName = "non_existing_schema_" + randomNameSuffix(); + String tableName = "test_create_table_in_non_existent_schema_" + randomNameSuffix(); + String tableLocation = schemaPath() + "/" + tableName; + assertQueryFails( + "CREATE TABLE " + schemaName + "." + tableName + " (a int, b int) WITH (location = '" + tableLocation + "')", + "Schema (.*) not found"); + assertThat(locationExists(tableLocation)) + .as("location should not exist").isFalse(); + + assertQueryFails( + "CREATE TABLE " + schemaName + "." + tableName + " (a, b) WITH (location = '" + tableLocation + "') AS VALUES (1, 2), (3, 4)", + "Schema (.*) not found"); + assertThat(locationExists(tableLocation)) + .as("location should not exist").isFalse(); + } + private String getTableLocation(String tableName) { return (String) computeScalar("SELECT DISTINCT regexp_replace(\"$path\", '/[^/]*/[^/]*$', '') FROM " + tableName); @@ -413,4 +432,8 @@ protected String getColumnComment(String tableName, String columnName) protected abstract void dropTableFromMetastore(String tableName); protected abstract String getMetadataLocation(String tableName); + + protected abstract String schemaPath(); + + protected abstract boolean locationExists(String location); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java index d2a1d432edf7..3dde19c49bc5 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java @@ -235,6 +235,19 @@ protected String getMetadataLocation(String tableName) .getParameters().get("metadata_location"); } + @Override + protected String schemaPath() + { + return format("s3://%s/%s", bucketName, schemaName); + } + + @Override + protected boolean locationExists(String location) + { + String prefix = "s3://" + bucketName + "/"; + return !hiveMinioDataLake.listFiles(location.substring(prefix.length())).isEmpty(); + } + @Override protected void deleteDirectory(String location) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java index 4c6841879641..1a5e57b4aceb 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java @@ -135,9 +135,6 @@ public DistributedQueryRunner build() icebergProperties.put("iceberg.catalog.type", "TESTING_FILE_METASTORE"); icebergProperties.put("hive.metastore.catalog.dir", dataDir.toString()); } - if ("jdbc".equalsIgnoreCase(catalogType) && !icebergProperties.containsKey("iceberg.jdbc-catalog.default-warehouse-dir")) { - icebergProperties.put("iceberg.jdbc-catalog.default-warehouse-dir", dataDir.toString()); - } queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties); schemaInitializer.orElseGet(() -> SchemaInitializer.builder().build()).accept(queryRunner); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAbfsConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAbfsConnectorSmokeTest.java index 9f8ccf72b70c..8b27ba3a2695 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAbfsConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAbfsConnectorSmokeTest.java @@ -140,6 +140,18 @@ protected String getMetadataLocation(String tableName) .getParameters().get("metadata_location"); } + @Override + protected String schemaPath() + { + return formatAbfsUrl(container, account, bucketName) + schemaName; + } + + @Override + protected boolean locationExists(String location) + { + return hiveHadoop.executeInContainer("hadoop", "fs", "-test", "-d", location).getExitCode() == 0; + } + @Override protected void deleteDirectory(String location) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConnectorSmokeTest.java index 54bf31d7151b..5458471b01a3 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConnectorSmokeTest.java @@ -27,6 +27,7 @@ import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; +import static java.lang.String.format; import static org.apache.iceberg.FileFormat.ORC; import static org.assertj.core.api.Assertions.assertThat; @@ -79,6 +80,18 @@ protected String getMetadataLocation(String tableName) .getParameters().get("metadata_location"); } + @Override + protected String schemaPath() + { + return format("%s/%s", metastoreDir, getSession().getSchema().orElseThrow()); + } + + @Override + protected boolean locationExists(String location) + { + return Files.exists(Path.of(location)); + } + @Override protected void deleteDirectory(String location) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGcsConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGcsConnectorSmokeTest.java index e59e8a28d9e6..aa8b9aa0fe52 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGcsConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGcsConnectorSmokeTest.java @@ -103,7 +103,7 @@ protected QueryRunner createQueryRunner() Configuration configuration = ConfigurationInstantiator.newEmptyConfiguration(); new GoogleGcsConfigurationInitializer(gcsConfig).initializeConfiguration(configuration); - this.fileSystem = FileSystem.newInstance(new URI(schemaUrl()), configuration); + this.fileSystem = FileSystem.newInstance(new URI(schemaPath()), configuration); } catch (IOException e) { throw new UncheckedIOException(e); @@ -124,7 +124,7 @@ protected QueryRunner createQueryRunner() SchemaInitializer.builder() .withClonedTpchTables(REQUIRED_TPCH_TABLES) .withSchemaName(schema) - .withSchemaProperties(ImmutableMap.of("location", "'" + schemaUrl() + "'")) + .withSchemaProperties(ImmutableMap.of("location", "'" + schemaPath() + "'")) .build()) .build(); } @@ -134,11 +134,11 @@ public void removeTestData() { if (fileSystem != null) { try { - fileSystem.delete(new org.apache.hadoop.fs.Path(schemaUrl()), true); + fileSystem.delete(new org.apache.hadoop.fs.Path(schemaPath()), true); } catch (IOException e) { // The GCS bucket should be configured to expire objects automatically. Clean up issues do not need to fail the test. - LOG.warn(e, "Failed to clean up GCS test directory: %s", schemaUrl()); + LOG.warn(e, "Failed to clean up GCS test directory: %s", schemaPath()); } fileSystem = null; } @@ -160,14 +160,26 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) @Override protected String createSchemaSql(String schema) { - return format("CREATE SCHEMA %1$s WITH (location = '%2$s%1$s')", schema, schemaUrl()); + return format("CREATE SCHEMA %1$s WITH (location = '%2$s%1$s')", schema, schemaPath()); } - private String schemaUrl() + @Override + protected String schemaPath() { return format("gs://%s/%s/", gcpStorageBucket, schema); } + @Override + protected boolean locationExists(String location) + { + try { + return fileSystem.exists(new org.apache.hadoop.fs.Path(location)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + @Test @Override public void testRenameSchema() diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java index 41b6eb737b0f..3104324db393 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java @@ -144,6 +144,7 @@ public void testCreateTable() ImmutableMultiset.builder() .add(CREATE_TABLE) .add(GET_DATABASE) + .add(GET_DATABASE) .add(GET_TABLE) .build()); } @@ -158,6 +159,7 @@ public void testCreateTableAsSelect() try { assertGlueMetastoreApiInvocations("CREATE TABLE test_ctas AS SELECT 1 AS age", ImmutableMultiset.builder() + .add(GET_DATABASE) .add(GET_DATABASE) .add(CREATE_TABLE) .add(GET_TABLE) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java index 9a4bca3d38dc..e1d799b5f47e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java @@ -224,8 +224,22 @@ protected void deleteDirectory(String location) assertThat(s3.listObjects(bucketName, location).getObjectSummaries()).isEmpty(); } - private String schemaPath() + @Override + protected String schemaPath() { return format("s3://%s/%s", bucketName, schemaName); } + + @Override + protected boolean locationExists(String location) + { + String prefix = "s3://" + bucketName + "/"; + AmazonS3 s3 = AmazonS3ClientBuilder.standard().build(); + ListObjectsV2Request request = new ListObjectsV2Request() + .withBucketName(bucketName) + .withPrefix(location.substring(prefix.length())) + .withMaxKeys(1); + return !s3.listObjectsV2(request) + .getObjectSummaries().isEmpty(); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java index b412b32e0b47..1c2a35c8e88d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java @@ -20,13 +20,22 @@ import io.trino.testing.QueryRunner; import io.trino.testing.TestingConnectorBehavior; +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.PASSWORD; import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.USER; +import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestIcebergJdbcCatalogConnectorSmokeTest extends BaseIcebergConnectorSmokeTest { + private File warehouseLocation; + public TestIcebergJdbcCatalogConnectorSmokeTest() { super(new IcebergConfig().getFileFormat().toIceberg()); @@ -48,6 +57,8 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) protected QueryRunner createQueryRunner() throws Exception { + warehouseLocation = Files.createTempDirectory("test_iceberg_jdbc_catalog_smoke_test").toFile(); + closeAfterClass(() -> deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE)); TestingIcebergJdbcServer server = closeAfterClass(new TestingIcebergJdbcServer()); return IcebergQueryRunner.builder() .setIcebergProperties( @@ -59,6 +70,7 @@ protected QueryRunner createQueryRunner() .put("iceberg.jdbc-catalog.connection-password", PASSWORD) .put("iceberg.jdbc-catalog.catalog-name", "tpch") .put("iceberg.register-table-procedure.enabled", "true") + .put("iceberg.jdbc-catalog.default-warehouse-dir", warehouseLocation.getAbsolutePath()) .buildOrThrow()) .setInitialTables(REQUIRED_TPCH_TABLES) .build(); @@ -98,6 +110,18 @@ protected String getMetadataLocation(String tableName) throw new UnsupportedOperationException("metadata location for register_table is not supported"); } + @Override + protected String schemaPath() + { + return format("%s/%s", warehouseLocation, getSession().getSchema().orElseThrow()); + } + + @Override + protected boolean locationExists(String location) + { + return Files.exists(Path.of(location)); + } + @Override public void testRegisterTableWithTableLocation() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java index 912fc6d34345..f802a99f54ce 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcConnectorTest.java @@ -23,8 +23,13 @@ import io.trino.tpch.TpchTable; import org.testng.annotations.Test; +import java.io.File; +import java.nio.file.Files; +import java.util.Optional; import java.util.OptionalInt; +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.PASSWORD; import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.USER; import static io.trino.tpch.TpchTable.LINE_ITEM; @@ -34,6 +39,8 @@ public class TestIcebergJdbcConnectorTest extends BaseIcebergConnectorTest { + private File warehouseLocation; + public TestIcebergJdbcConnectorTest() { super(new IcebergConfig().getFileFormat()); @@ -43,8 +50,11 @@ public TestIcebergJdbcConnectorTest() protected QueryRunner createQueryRunner() throws Exception { + warehouseLocation = Files.createTempDirectory("test_iceberg_jdbc_connector_test").toFile(); + closeAfterClass(() -> deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE)); TestingIcebergJdbcServer server = closeAfterClass(new TestingIcebergJdbcServer()); return IcebergQueryRunner.builder() + .setBaseDataDir(Optional.of(warehouseLocation.toPath())) .setIcebergProperties( ImmutableMap.builder() .put("iceberg.file-format", format.name()) @@ -53,6 +63,7 @@ protected QueryRunner createQueryRunner() .put("iceberg.jdbc-catalog.connection-user", USER) .put("iceberg.jdbc-catalog.connection-password", PASSWORD) .put("iceberg.jdbc-catalog.catalog-name", "tpch") + .put("iceberg.jdbc-catalog.default-warehouse-dir", warehouseLocation.toPath().resolve("iceberg_data").toFile().getAbsolutePath()) .buildOrThrow()) .setInitialTables(ImmutableList.>builder() .addAll(REQUIRED_TPCH_TABLES) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConnectorSmokeTest.java index 42d2e3296adc..a290e565147d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogConnectorSmokeTest.java @@ -25,16 +25,20 @@ import org.assertj.core.util.Files; import java.io.File; +import java.nio.file.Path; import java.util.Optional; import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.trino.plugin.iceberg.catalog.rest.RestCatalogTestUtils.backendCatalog; +import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestIcebergRestCatalogConnectorSmokeTest extends BaseIcebergConnectorSmokeTest { + private File warehouseLocation; + public TestIcebergRestCatalogConnectorSmokeTest() { super(new IcebergConfig().getFileFormat().toIceberg()); @@ -56,7 +60,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) protected QueryRunner createQueryRunner() throws Exception { - File warehouseLocation = Files.newTemporaryFolder(); + warehouseLocation = Files.newTemporaryFolder(); closeAfterClass(() -> deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE)); Catalog backend = backendCatalog(warehouseLocation); @@ -116,6 +120,18 @@ protected String getMetadataLocation(String tableName) throw new UnsupportedOperationException("metadata location for register_table is not supported"); } + @Override + protected String schemaPath() + { + return format("%s/%s", warehouseLocation, getSession().getSchema()); + } + + @Override + protected boolean locationExists(String location) + { + return java.nio.file.Files.exists(Path.of(location)); + } + @Override public void testRegisterTableWithTableLocation() {