diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index 66d285d4d5e8..d2fb2b02d840 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -30,6 +30,7 @@ public class IcebergConfig private HiveCompressionCodec compressionCodec = GZIP; private boolean useFileSizeFromMetadata = true; private int maxPartitionsPerWriter = 100; + private boolean uniqueTableLocation; @NotNull public FileFormat getFileFormat() @@ -91,4 +92,17 @@ public IcebergConfig setMaxPartitionsPerWriter(int maxPartitionsPerWriter) this.maxPartitionsPerWriter = maxPartitionsPerWriter; return this; } + + public boolean isUniqueTableLocation() + { + return uniqueTableLocation; + } + + @Config("iceberg.unique-table-location") + @ConfigDescription("Use randomized, unique table locations") + public IcebergConfig setUniqueTableLocation(boolean uniqueTableLocation) + { + this.uniqueTableLocation = uniqueTableLocation; + return this; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 52fdef733a1d..6d1ede431326 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -108,7 +108,6 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.Set; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiPredicate; @@ -166,6 +165,7 @@ import static io.trino.spi.type.BigintType.BIGINT; import static java.util.Collections.singletonList; import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; import static java.util.function.Function.identity; import static java.util.stream.Collectors.joining; import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW; @@ -198,6 +198,7 @@ public class IcebergMetadata private final JsonCodec commitTaskCodec; private final HiveTableOperationsProvider tableOperationsProvider; private final String trinoVersion; + private final boolean useUniqueTableLocation; private final Map> snapshotIds = new ConcurrentHashMap<>(); private final Map tableMetadataCache = new ConcurrentHashMap<>(); @@ -212,7 +213,8 @@ public IcebergMetadata( TypeManager typeManager, JsonCodec commitTaskCodec, HiveTableOperationsProvider tableOperationsProvider, - String trinoVersion) + String trinoVersion, + boolean useUniqueTableLocation) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.metastore = requireNonNull(metastore, "metastore is null"); @@ -221,6 +223,7 @@ public IcebergMetadata( this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null"); + this.useUniqueTableLocation = useUniqueTableLocation; } @Override @@ -575,7 +578,11 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con HiveIdentity identity = new HiveIdentity(session); String targetPath = getTableLocation(tableMetadata.getProperties()); if (targetPath == null) { - targetPath = getTableDefaultLocation(database, hdfsContext, hdfsEnvironment, schemaName, tableName).toString(); + String tableNameForLocation = tableName; + if (useUniqueTableLocation) { + tableNameForLocation += "-" + randomUUID().toString().replace("-", ""); + } + targetPath = getTableDefaultLocation(database, hdfsContext, hdfsEnvironment, schemaName, tableNameForLocation).toString(); } TableOperations operations = tableOperationsProvider.createTableOperations( @@ -1044,7 +1051,7 @@ public void createMaterializedView(ConnectorSession session, SchemaTableName vie // Generate a storage table name and create a storage table. The properties in the definition are table properties for the // storage table as indicated in the materialized view definition. - String storageTableName = "st_" + UUID.randomUUID().toString().replace("-", ""); + String storageTableName = "st_" + randomUUID().toString().replace("-", ""); Map storageTableProperties = new HashMap<>(definition.getProperties()); storageTableProperties.putIfAbsent(FILE_FORMAT_PROPERTY, DEFAULT_FILE_FORMAT_DEFAULT); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java index 9e95cea028c0..9648193117cf 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java @@ -33,6 +33,7 @@ public class IcebergMetadataFactory private final JsonCodec commitTaskCodec; private final HiveTableOperationsProvider tableOperationsProvider; private final String trinoVersion; + private final boolean useUniqueTableLocation; @Inject public IcebergMetadataFactory( @@ -45,7 +46,7 @@ public IcebergMetadataFactory( HiveTableOperationsProvider tableOperationsProvider, NodeVersion nodeVersion) { - this(catalogName, metastore, hdfsEnvironment, typeManager, commitTaskDataJsonCodec, tableOperationsProvider, nodeVersion); + this(catalogName, metastore, hdfsEnvironment, typeManager, commitTaskDataJsonCodec, tableOperationsProvider, nodeVersion, config.isUniqueTableLocation()); } public IcebergMetadataFactory( @@ -55,7 +56,8 @@ public IcebergMetadataFactory( TypeManager typeManager, JsonCodec commitTaskCodec, HiveTableOperationsProvider tableOperationsProvider, - NodeVersion nodeVersion) + NodeVersion nodeVersion, + boolean useUniqueTableLocation) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.metastore = requireNonNull(metastore, "metastore is null"); @@ -64,10 +66,11 @@ public IcebergMetadataFactory( this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString(); + this.useUniqueTableLocation = useUniqueTableLocation; } public IcebergMetadata create() { - return new IcebergMetadata(catalogName, metastore, hdfsEnvironment, typeManager, commitTaskCodec, tableOperationsProvider, trinoVersion); + return new IcebergMetadata(catalogName, metastore, hdfsEnvironment, typeManager, commitTaskCodec, tableOperationsProvider, trinoVersion, useUniqueTableLocation); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java index 6b3245c75c23..c67a32f12942 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java @@ -72,6 +72,17 @@ public static DistributedQueryRunner createIcebergQueryRunner( List> tables, Optional metastoreDirectory) throws Exception + { + return createIcebergQueryRunner(extraProperties, ImmutableMap.of(), format, tables, metastoreDirectory); + } + + public static DistributedQueryRunner createIcebergQueryRunner( + Map extraProperties, + Map connectorProperties, + FileFormat format, + List> tables, + Optional metastoreDirectory) + throws Exception { Session session = testSessionBuilder() @@ -93,6 +104,7 @@ public static DistributedQueryRunner createIcebergQueryRunner( .put("hive.metastore", "file") .put("hive.metastore.catalog.dir", dataDir.toString()) .put("iceberg.file-format", format.name()) + .putAll(connectorProperties) .build(); queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index d08196f2056d..fc10ea4230a5 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -35,7 +35,8 @@ public void testDefaults() .setFileFormat(ORC) .setCompressionCodec(GZIP) .setUseFileSizeFromMetadata(true) - .setMaxPartitionsPerWriter(100)); + .setMaxPartitionsPerWriter(100) + .setUniqueTableLocation(false)); } @Test @@ -46,13 +47,15 @@ public void testExplicitPropertyMappings() .put("iceberg.compression-codec", "NONE") .put("iceberg.use-file-size-from-metadata", "false") .put("iceberg.max-partitions-per-writer", "222") + .put("iceberg.unique-table-location", "true") .build(); IcebergConfig expected = new IcebergConfig() .setFileFormat(PARQUET) .setCompressionCodec(HiveCompressionCodec.NONE) .setUseFileSizeFromMetadata(false) - .setMaxPartitionsPerWriter(222); + .setMaxPartitionsPerWriter(222) + .setUniqueTableLocation(true); assertFullMapping(properties, expected); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTableWithCustomLocation.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTableWithCustomLocation.java new file mode 100644 index 000000000000..93e899ea56b6 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergTableWithCustomLocation.java @@ -0,0 +1,136 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.hive.metastore.file.FileHiveMetastore; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Optional; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; +import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner; +import static io.trino.tpch.TpchTable.NATION; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; + +public class TestIcebergTableWithCustomLocation + extends AbstractTestQueryFramework +{ + private FileHiveMetastore metastore; + private File metastoreDir; + + @Override + protected DistributedQueryRunner createQueryRunner() + throws Exception + { + metastoreDir = Files.createTempDirectory("test_iceberg").toFile(); + metastore = createTestingFileHiveMetastore(metastoreDir); + + return createIcebergQueryRunner( + ImmutableMap.of(), + ImmutableMap.of("iceberg.unique-table-location", "true"), + new IcebergConfig().getFileFormat(), + ImmutableList.of(NATION), + Optional.of(metastoreDir)); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + throws IOException + { + deleteRecursively(metastoreDir.toPath(), ALLOW_INSECURE); + } + + @Test + public void testTableHasUuidSuffixInLocation() + { + String tableName = "table_with_uuid"; + assertQuerySucceeds(format("CREATE TABLE %s as select 1 as val", tableName)); + Optional table = metastore.getTable(null, "tpch", tableName); + assertTrue(table.isPresent(), "Table should exists"); + String location = table.get().getStorage().getLocation(); + assertThat(location).matches(format(".*%s-[0-9a-f]{32}", tableName)); + } + + @Test + public void testCreateAndDrop() + { + String tableName = "test_create_and_drop"; + assertQuerySucceeds(format("CREATE TABLE %s as select 1 as val", tableName)); + Optional
table = metastore.getTable(null, "tpch", tableName); + assertTrue(table.isPresent(), "Table should exist"); + + assertQuerySucceeds(format("DROP TABLE %s", tableName)); + assertFalse(metastore.getTable(null, "tpch", tableName).isPresent(), "Table should be dropped"); + } + + @Test + public void testCreateRenameDrop() + { + String tableName = "test_create_rename_drop"; + String renamedName = "test_create_rename_drop_renamed"; + assertQuerySucceeds(format("CREATE TABLE %s as select 1 as val", tableName)); + Optional
table = metastore.getTable(null, "tpch", tableName); + assertTrue(table.isPresent(), "Table should exist"); + String tableInitialLocation = table.get().getStorage().getLocation(); + + assertQuerySucceeds(format("ALTER TABLE %s RENAME TO %s", tableName, renamedName)); + Optional
renamedTable = metastore.getTable(null, "tpch", renamedName); + assertTrue(renamedTable.isPresent(), "Table should exist"); + String renamedTableLocation = renamedTable.get().getStorage().getLocation(); + assertEquals(renamedTableLocation, tableInitialLocation, "Location should not be changed"); + + assertQuerySucceeds(format("DROP TABLE %s", renamedName)); + assertFalse(metastore.getTable(null, "tpch", tableName).isPresent(), "Initial table should not exists"); + assertFalse(metastore.getTable(null, "tpch", renamedName).isPresent(), "Renamed table should be dropped"); + } + + @Test + public void testCreateRenameCreate() + { + String tableName = "test_create_rename_create"; + String renamedName = "test_create_rename_create_renamed"; + assertQuerySucceeds(format("CREATE TABLE %s as select 1 as val", tableName)); + Optional
table = metastore.getTable(null, "tpch", tableName); + assertTrue(table.isPresent(), "Table should exist"); + String tableInitialLocation = table.get().getStorage().getLocation(); + + assertQuerySucceeds(format("ALTER TABLE %s RENAME TO %s", tableName, renamedName)); + Optional
renamedTable = metastore.getTable(null, "tpch", renamedName); + assertTrue(renamedTable.isPresent(), "Table should exist"); + String renamedTableLocation = renamedTable.get().getStorage().getLocation(); + assertEquals(renamedTableLocation, tableInitialLocation, "Location should not be changed"); + + assertQuerySucceeds(format("CREATE TABLE %s as select 1 as val", tableName)); + Optional
recreatedTableWithInitialName = metastore.getTable(null, "tpch", tableName); + assertTrue(recreatedTableWithInitialName.isPresent(), "Table should exist"); + String recreatedTableLocation = recreatedTableWithInitialName.get().getStorage().getLocation(); + assertNotEquals(tableInitialLocation, recreatedTableLocation, "Location should be different"); + } +}