diff --git a/docs/src/main/sphinx/connector/iceberg.rst b/docs/src/main/sphinx/connector/iceberg.rst index bb7cb2a12571..4efc6c07ac74 100644 --- a/docs/src/main/sphinx/connector/iceberg.rst +++ b/docs/src/main/sphinx/connector/iceberg.rst @@ -116,6 +116,9 @@ is used. * - ``iceberg.target-max-file-size`` - Target maximum size of written files; the actual size may be larger - ``1GB`` + * - ``iceberg.unique-table-location`` + - Use randomized, unique table locations + - ``false`` * - ``iceberg.delete-schema-locations-fallback`` - Whether schema locations should be deleted when Trino can't determine whether they contain external files. - ``false`` diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index c2ce359fc14a..5c87476af3b0 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -565,16 +565,30 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con { verify(transaction == null, "transaction already set"); transaction = newCreateTableTransaction(catalog, tableMetadata, session); - return new IcebergWritableTableHandle( - tableMetadata.getTable().getSchemaName(), - tableMetadata.getTable().getTableName(), - SchemaParser.toJson(transaction.table().schema()), - PartitionSpecParser.toJson(transaction.table().spec()), - getColumns(transaction.table().schema(), typeManager), - transaction.table().location(), - getFileFormat(transaction.table()), - transaction.table().properties(), - retryMode); + String location = transaction.table().location(); + HdfsContext hdfsContext = new HdfsContext(session); + try { + Path path = new Path(location); + FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, path); + if (fileSystem.exists(path) && fileSystem.listFiles(path, true).hasNext()) { + throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, format( + "Cannot create a table on a non-empty location: %s, set 'iceberg.unique-table-location=true' in your Iceberg catalog properties " + + "to use unique table locations for every table.", location)); + } + return new IcebergWritableTableHandle( + tableMetadata.getTable().getSchemaName(), + tableMetadata.getTable().getTableName(), + SchemaParser.toJson(transaction.table().schema()), + PartitionSpecParser.toJson(transaction.table().spec()), + getColumns(transaction.table().schema(), typeManager), + location, + getFileFormat(transaction.table()), + transaction.table().properties(), + retryMode); + } + catch (IOException e) { + throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed checking new table's location: " + location, e); + } } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 6f790c1fb85e..cf1192e0484a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -73,6 +73,7 @@ import java.util.stream.Stream; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.Iterables.concat; @@ -1021,6 +1022,38 @@ public void testLargeInOnPartitionedColumns() dropTable("test_in_predicate_large_set"); } + @Test + public void testCreateTableFailsOnNonEmptyPath() + { + String tableName = "test_rename_table_" + randomTableSuffix(); + String tmpName = "test_rename_table_tmp_" + randomTableSuffix(); + try { + assertUpdate("CREATE TABLE " + tmpName + " AS SELECT 1 as a", 1); + assertUpdate("ALTER TABLE " + tmpName + " RENAME TO " + tableName); + assertQueryFails("CREATE TABLE " + tmpName + " AS SELECT 1 as a", "Cannot create a table on a non-empty location.*"); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + assertUpdate("DROP TABLE IF EXISTS " + tmpName); + } + } + + @Test + public void testCreateTableSucceedsOnEmptyDirectory() + { + File tempDir = getDistributedQueryRunner().getCoordinator().getBaseDataDir().toFile(); + String tmpName = "test_rename_table_tmp_" + randomTableSuffix(); + Path newPath = tempDir.toPath().resolve(tmpName); + File directory = newPath.toFile(); + verify(directory.mkdirs(), "Could not make directory on filesystem"); + try { + assertUpdate("CREATE TABLE " + tmpName + " WITH (location='" + directory + "') AS SELECT 1 as a", 1); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tmpName); + } + } + @Test public void testCreateTableLike() { @@ -1057,21 +1090,11 @@ private void testCreateTableLikeForFormat(IcebergFileFormat otherFormat) format(" format = '%s',\n format_version = 2,\n location = '%s'\n)", format, tempDir + "/iceberg_data/tpch/test_create_table_like_copy2")); dropTable("test_create_table_like_copy2"); - assertUpdate("CREATE TABLE test_create_table_like_copy3 (LIKE test_create_table_like_original INCLUDING PROPERTIES)"); - assertEquals(getTablePropertiesString("test_create_table_like_copy3"), "WITH (\n" + - format(" format = '%s',\n", format) + - " format_version = 2,\n" + - format(" location = '%s',\n", tempDirPath) + - " partitioning = ARRAY['adate']\n" + - ")"); + assertQueryFails("CREATE TABLE test_create_table_like_copy3 (LIKE test_create_table_like_original INCLUDING PROPERTIES)", + "Cannot create a table on a non-empty location.*"); - assertUpdate(format("CREATE TABLE test_create_table_like_copy4 (LIKE test_create_table_like_original INCLUDING PROPERTIES) WITH (format = '%s')", otherFormat)); - assertEquals(getTablePropertiesString("test_create_table_like_copy4"), "WITH (\n" + - format(" format = '%s',\n", otherFormat) + - " format_version = 2,\n" + - format(" location = '%s',\n", tempDirPath) + - " partitioning = ARRAY['adate']\n" + - ")"); + assertQueryFails(format("CREATE TABLE test_create_table_like_copy4 (LIKE test_create_table_like_original INCLUDING PROPERTIES) WITH (format = '%s')", otherFormat), + "Cannot create a table on a non-empty location.*"); } private String getTablePropertiesString(String tableName) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTableWithExternalLocation.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTableWithExternalLocation.java index e6377558b28a..ed8963613467 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTableWithExternalLocation.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTableWithExternalLocation.java @@ -111,6 +111,6 @@ public void testCreateAndDrop() assertQuerySucceeds(format("DROP TABLE %s", tableName)); assertThat(metastore.getTable("tpch", tableName)).as("Table should be dropped").isEmpty(); assertFalse(fileSystem.exists(new Path(dataFile.getFilePath())), "The data file should have been removed"); - assertFalse(fileSystem.exists(tableLocation), "The directory corresponding to the dropped Iceberg table should not be removed because it may be shared with other tables"); + assertFalse(fileSystem.exists(tableLocation), "The directory corresponding to the dropped Iceberg table should be removed as we don't allow shared locations."); } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index 33b61df69b7d..b885e6958f5a 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -829,6 +829,45 @@ public void testTrinoShowingSparkCreatedTables(int specVersion) onTrino().executeQuery("DROP TABLE " + trinoTableName(trinoTable)); } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions") + public void testCreateAndDropTableWithSameLocationWorksOnSpark(int specVersion) + { + String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_create_table_same_location/obj-data"; + String tableSameLocation1 = "test_same_location_spark_1_" + randomTableSuffix(); + String tableSameLocation2 = "test_same_location_spark_2_" + randomTableSuffix(); + + onSpark().executeQuery(format("CREATE TABLE %s (_integer INTEGER ) USING ICEBERG LOCATION '%s' TBLPROPERTIES('format-version' = %s)", + sparkTableName(tableSameLocation1), dataPath, specVersion)); + onSpark().executeQuery(format("CREATE TABLE %s (_integer INTEGER ) USING ICEBERG LOCATION '%s' TBLPROPERTIES('format-version' = %s)", + sparkTableName(tableSameLocation2), dataPath, specVersion)); + + onSpark().executeQuery(format("DROP TABLE IF EXISTS %s", sparkTableName(tableSameLocation1))); + + assertThat(onTrino().executeQuery(format("SELECT * FROM %s", trinoTableName(tableSameLocation2)))).hasNoRows(); + + onSpark().executeQuery(format("DROP TABLE %s", sparkTableName(tableSameLocation2))); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions") + public void testCreateAndDropTableWithSameLocationFailsOnTrino(int specVersion) + { + String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_create_table_same_location/obj-data"; + String tableSameLocation1 = "test_same_location_trino_1_" + randomTableSuffix(); + String tableSameLocation2 = "test_same_location_trino_2_" + randomTableSuffix(); + + onSpark().executeQuery(format("CREATE TABLE %s (_integer INTEGER ) USING ICEBERG LOCATION '%s' TBLPROPERTIES('format-version' = %s)", + sparkTableName(tableSameLocation1), dataPath, specVersion)); + onSpark().executeQuery(format("CREATE TABLE %s (_integer INTEGER ) USING ICEBERG LOCATION '%s' TBLPROPERTIES('format-version' = %s)", + sparkTableName(tableSameLocation2), dataPath, specVersion)); + + onTrino().executeQuery(format("DROP TABLE %s", trinoTableName(tableSameLocation1))); + + assertQueryFailure(() -> onTrino().executeQuery(format("SELECT * FROM %s", trinoTableName(tableSameLocation2)))) + .hasMessageMatching(".*Failed to open input stream for file.*"); + + // Can't clean up tableSameLocation2 as all data and metadata has been removed + } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion") public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat storageFormat, int specVersion) {