diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index a683533473be..05c4bd97a560 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -18,13 +18,21 @@ */ package org.apache.iceberg; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Map; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hadoop.Util; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SupportsPrefixOperations; import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -192,6 +200,15 @@ public Table create() { String baseLocation = location != null ? location : defaultWarehouseLocation(identifier); tableProperties.putAll(tableOverrideProperties()); + + if (Boolean.parseBoolean( + tableProperties.get(TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED))) { + boolean conflictLocationDetected = locationInUse(baseLocation, ops.io()); + if (conflictLocationDetected) { + throw new AlreadyExistsException("Table location already exists: %s", baseLocation); + } + } + TableMetadata metadata = TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, tableProperties); @@ -204,6 +221,29 @@ public Table create() { return new BaseTable(ops, fullTableName(name(), identifier), metricsReporter()); } + private boolean locationInUse(String baseLocation, FileIO io) { + boolean directionExist = io.newInputFile(baseLocation).exists(); + if (io instanceof SupportsPrefixOperations) { + return directionExist + && ((SupportsPrefixOperations) io).listPrefix(baseLocation).iterator().hasNext(); + } else if (io instanceof Configurable) { + try { + Path locationPath = new Path(baseLocation); + FileSystem fs = Util.getFs(locationPath, ((Configurable) io).getConf()); + return fs.exists(locationPath) && fs.listFiles(locationPath, true).hasNext(); + } catch (IOException e) { + throw new UncheckedIOException( + String.format("Failed to get file system for location: %s", baseLocation), e); + } + } else { + LOG.warn( + "Unable to decide if given location {} is already in used given IO {}", + baseLocation, + io); + return directionExist; + } + } + @Override public Transaction createTransaction() { TableOperations ops = newTableOps(identifier); diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index a9116bc57f83..16fb78b88917 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -365,4 +365,8 @@ private TableProperties() {} public static final String UPSERT_ENABLED = "write.upsert.enabled"; public static final boolean UPSERT_ENABLED_DEFAULT = false; + + public static final String LOCATION_CONFLICT_DETECTION_ENABLED = + "location.conflict-detection.enabled"; + public static final boolean LOCATION_CONFLICT_DETECTION_ENABLED_DEFAULT = false; } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java index 4f889b24cae8..72bc78980a89 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCatalog.java @@ -20,11 +20,13 @@ import static org.apache.iceberg.NullOrder.NULLS_FIRST; import static org.apache.iceberg.SortDirection.ASC; +import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Set; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -36,6 +38,7 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; @@ -45,6 +48,7 @@ import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.PositionOutputStream; +import org.apache.iceberg.io.ResolvingFileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -52,6 +56,9 @@ import org.apache.iceberg.transforms.Transforms; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; public class TestHadoopCatalog extends HadoopTableTestBase { private static ImmutableMap meta = ImmutableMap.of(); @@ -202,6 +209,46 @@ public void testBasicCatalog() throws Exception { Assertions.assertThat(fs.isDirectory(new Path(metaLocation))).isFalse(); } + public static Stream fileIOClassProvider() { + return Stream.of( + Arguments.of(HadoopFileIO.class.getCanonicalName()), + Arguments.of(ResolvingFileIO.class.getCanonicalName())); + } + + @ParameterizedTest + @MethodSource("fileIOClassProvider") + public void testCreateTableWithLocationConflict(String fileIOClass) throws IOException { + TableIdentifier tableIdent = TableIdentifier.of("db", "ns1", "tbl"); + TableIdentifier tableIdentWithLocationConflict = TableIdentifier.of("db", "ns1"); + ImmutableMap catalogProps = + ImmutableMap.of( + String.format("table-default.%s", TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED), + "true", + CatalogProperties.FILE_IO_IMPL, + fileIOClass); + + HadoopCatalog catalog = hadoopCatalog(catalogProps); + + Table original = catalog.buildTable(tableIdent, SCHEMA).create(); + assertThat(original.location()).isEqualTo(catalog.defaultWarehouseLocation(tableIdent)); + + Assertions.assertThatThrownBy( + () -> catalog.buildTable(tableIdentWithLocationConflict, SCHEMA).create()) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageStartingWith("Table location already exists"); + + Table withLocationConflict = + catalog + .buildTable(tableIdentWithLocationConflict, SCHEMA) + .withProperty(TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED, "false") + .create(); + assertThat(withLocationConflict.location()) + .isEqualTo(catalog.defaultWarehouseLocation(tableIdentWithLocationConflict)); + + catalog.dropTable(tableIdent); + catalog.dropTable(tableIdentWithLocationConflict); + } + @Test public void testCreateAndDropTableWithoutNamespace() throws Exception { HadoopCatalog catalog = hadoopCatalog(); diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java index 4634de57073d..e82f92c12fc2 100644 --- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java @@ -49,6 +49,7 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.CatalogTests; import org.apache.iceberg.catalog.Namespace; @@ -57,8 +58,10 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.ResolvingFileIO; import org.apache.iceberg.metrics.MetricsReport; import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -73,6 +76,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; public class TestJdbcCatalog extends CatalogTests { @@ -285,6 +291,61 @@ public void testBasicCatalog() throws Exception { catalog.dropTable(testTable); } + public static Stream fileIOClassProvider() { + return Stream.of( + Arguments.of(HadoopFileIO.class.getCanonicalName()), + Arguments.of(ResolvingFileIO.class.getCanonicalName())); + } + + @ParameterizedTest + @MethodSource("fileIOClassProvider") + public void testCreateTableWithLocationConflict(String fileIOClass) throws IOException { + try (JdbcCatalog jdbcCatalog = + initCatalog( + "unique_jdbc_catalog", + ImmutableMap.of( + String.format( + "table-default.%s", TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED), + "true", + CatalogProperties.FILE_IO_IMPL, + fileIOClass))) { + Namespace testNamespace = Namespace.of("testDb", "ns1", "ns2"); + jdbcCatalog.createNamespace(testNamespace, Maps.newHashMap()); + TableIdentifier tableIdent = TableIdentifier.of(testNamespace, "original"); + TableIdentifier tableRenamed = TableIdentifier.of(testNamespace, "renamed"); + + Table table = jdbcCatalog.createTable(tableIdent, SCHEMA, PartitionSpec.unpartitioned()); + String currentLocation = table.location(); + + FileSystem fs = Util.getFs(new Path(currentLocation), conf); + assertThat(fs.isDirectory(new Path(currentLocation))).isTrue(); + jdbcCatalog.renameTable(tableIdent, tableRenamed); + + Assertions.assertThatThrownBy( + () -> + jdbcCatalog.createTable( + tableIdent, + SCHEMA, + PartitionSpec.unpartitioned(), + currentLocation, + ImmutableMap.of())) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageStartingWith("Table location already exists"); + + Table recreated = + jdbcCatalog.createTable( + tableIdent, + SCHEMA, + PartitionSpec.unpartitioned(), + currentLocation, + ImmutableMap.of(TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED, "false")); + assertThat(recreated.location()).isEqualTo(currentLocation); + + jdbcCatalog.dropTable(tableRenamed); + jdbcCatalog.dropTable(tableIdent); + } + } + @Test public void testCreateAndDropTableWithoutNamespace() throws Exception { TableIdentifier testTable = TableIdentifier.of("tbl"); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index d4ac49868488..b22ba667568a 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -41,6 +41,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; @@ -74,6 +75,8 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.ResolvingFileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -84,6 +87,9 @@ import org.apache.thrift.TException; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; public class TestHiveCatalog extends HiveMetastoreTest { private static ImmutableMap meta = @@ -1162,6 +1168,54 @@ public void testDatabaseLocationWithSlashInWarehouseDir() { assertThat(database.getLocationUri()).isEqualTo("s3://bucket/database.db"); } + public static Stream fileIOClassProvider() { + return Stream.of( + Arguments.of(HadoopFileIO.class.getCanonicalName()), + Arguments.of(ResolvingFileIO.class.getCanonicalName())); + } + + @ParameterizedTest + @MethodSource("fileIOClassProvider") + public void testCreateTableWithLocationConflict(String fileIOClass) throws IOException { + Schema schema = getTestSchema(); + TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "original"); + TableIdentifier tableRenamed = TableIdentifier.of(DB_NAME, "renamed"); + + ImmutableMap catalogProps = + ImmutableMap.of( + String.format("table-default.%s", TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED), + "true", + CatalogProperties.FILE_IO_IMPL, + fileIOClass); + Catalog hiveCatalog = + CatalogUtil.loadCatalog( + HiveCatalog.class.getName(), + CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, + catalogProps, + hiveConf); + + try { + Table table = hiveCatalog.buildTable(tableIdent, schema).create(); + String currentLocation = table.location(); + + catalog.renameTable(tableIdent, tableRenamed); + + assertThatThrownBy(() -> hiveCatalog.buildTable(tableIdent, schema).create()) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageStartingWith("Table location already exists"); + + Table recreated = + hiveCatalog + .buildTable(tableIdent, schema) + .withProperty(TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED, "false") + .create(); + assertThat(recreated.location()).isEqualTo(currentLocation); + } finally { + hiveCatalog.dropTable(tableRenamed, true); + hiveCatalog.dropTable(tableIdent, true); + } + } + @Test public void testRegisterTable() { TableIdentifier identifier = TableIdentifier.of(DB_NAME, "t1");