Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,21 @@
*/
package org.apache.iceberg;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SupportsPrefixOperations;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -192,6 +200,15 @@ public Table create() {

String baseLocation = location != null ? location : defaultWarehouseLocation(identifier);
tableProperties.putAll(tableOverrideProperties());

if (Boolean.parseBoolean(
tableProperties.get(TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED))) {
boolean conflictLocationDetected = locationInUse(baseLocation, ops.io());
if (conflictLocationDetected) {
throw new AlreadyExistsException("Table location already exists: %s", baseLocation);
}
}

TableMetadata metadata =
TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, tableProperties);

Expand All @@ -204,6 +221,29 @@ public Table create() {
return new BaseTable(ops, fullTableName(name(), identifier), metricsReporter());
}

private boolean locationInUse(String baseLocation, FileIO io) {
boolean directionExist = io.newInputFile(baseLocation).exists();
if (io instanceof SupportsPrefixOperations) {
return directionExist
&& ((SupportsPrefixOperations) io).listPrefix(baseLocation).iterator().hasNext();
} else if (io instanceof Configurable) {
try {
Path locationPath = new Path(baseLocation);
FileSystem fs = Util.getFs(locationPath, ((Configurable) io).getConf());
return fs.exists(locationPath) && fs.listFiles(locationPath, true).hasNext();
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Failed to get file system for location: %s", baseLocation), e);
}
} else {
LOG.warn(
"Unable to decide if given location {} is already in used given IO {}",
baseLocation,
io);
return directionExist;
}
}

@Override
public Transaction createTransaction() {
TableOperations ops = newTableOps(identifier);
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -365,4 +365,8 @@ private TableProperties() {}

public static final String UPSERT_ENABLED = "write.upsert.enabled";
public static final boolean UPSERT_ENABLED_DEFAULT = false;

public static final String LOCATION_CONFLICT_DETECTION_ENABLED =
"location.conflict-detection.enabled";
public static final boolean LOCATION_CONFLICT_DETECTION_ENABLED_DEFAULT = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

import static org.apache.iceberg.NullOrder.NULLS_FIRST;
import static org.apache.iceberg.SortDirection.ASC;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -36,6 +38,7 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -45,13 +48,17 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class TestHadoopCatalog extends HadoopTableTestBase {
private static ImmutableMap<String, String> meta = ImmutableMap.of();
Expand Down Expand Up @@ -202,6 +209,46 @@ public void testBasicCatalog() throws Exception {
Assertions.assertThat(fs.isDirectory(new Path(metaLocation))).isFalse();
}

public static Stream<Arguments> fileIOClassProvider() {
return Stream.of(
Arguments.of(HadoopFileIO.class.getCanonicalName()),
Arguments.of(ResolvingFileIO.class.getCanonicalName()));
}

@ParameterizedTest
@MethodSource("fileIOClassProvider")
public void testCreateTableWithLocationConflict(String fileIOClass) throws IOException {
TableIdentifier tableIdent = TableIdentifier.of("db", "ns1", "tbl");
TableIdentifier tableIdentWithLocationConflict = TableIdentifier.of("db", "ns1");
ImmutableMap<String, String> catalogProps =
ImmutableMap.of(
String.format("table-default.%s", TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED),
"true",
CatalogProperties.FILE_IO_IMPL,
fileIOClass);

HadoopCatalog catalog = hadoopCatalog(catalogProps);

Table original = catalog.buildTable(tableIdent, SCHEMA).create();
assertThat(original.location()).isEqualTo(catalog.defaultWarehouseLocation(tableIdent));

Assertions.assertThatThrownBy(
() -> catalog.buildTable(tableIdentWithLocationConflict, SCHEMA).create())
.isInstanceOf(AlreadyExistsException.class)
.hasMessageStartingWith("Table location already exists");

Table withLocationConflict =
catalog
.buildTable(tableIdentWithLocationConflict, SCHEMA)
.withProperty(TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED, "false")
.create();
assertThat(withLocationConflict.location())
.isEqualTo(catalog.defaultWarehouseLocation(tableIdentWithLocationConflict));

catalog.dropTable(tableIdent);
catalog.dropTable(tableIdentWithLocationConflict);
}

@Test
public void testCreateAndDropTableWithoutNamespace() throws Exception {
HadoopCatalog catalog = hadoopCatalog();
Expand Down
61 changes: 61 additions & 0 deletions core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.catalog.Namespace;
Expand All @@ -57,8 +58,10 @@
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand All @@ -73,6 +76,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class TestJdbcCatalog extends CatalogTests<JdbcCatalog> {

Expand Down Expand Up @@ -285,6 +291,61 @@ public void testBasicCatalog() throws Exception {
catalog.dropTable(testTable);
}

public static Stream<Arguments> fileIOClassProvider() {
return Stream.of(
Arguments.of(HadoopFileIO.class.getCanonicalName()),
Arguments.of(ResolvingFileIO.class.getCanonicalName()));
}

@ParameterizedTest
@MethodSource("fileIOClassProvider")
public void testCreateTableWithLocationConflict(String fileIOClass) throws IOException {
try (JdbcCatalog jdbcCatalog =
initCatalog(
"unique_jdbc_catalog",
ImmutableMap.of(
String.format(
"table-default.%s", TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED),
"true",
CatalogProperties.FILE_IO_IMPL,
fileIOClass))) {
Namespace testNamespace = Namespace.of("testDb", "ns1", "ns2");
jdbcCatalog.createNamespace(testNamespace, Maps.newHashMap());
TableIdentifier tableIdent = TableIdentifier.of(testNamespace, "original");
TableIdentifier tableRenamed = TableIdentifier.of(testNamespace, "renamed");

Table table = jdbcCatalog.createTable(tableIdent, SCHEMA, PartitionSpec.unpartitioned());
String currentLocation = table.location();

FileSystem fs = Util.getFs(new Path(currentLocation), conf);
assertThat(fs.isDirectory(new Path(currentLocation))).isTrue();
jdbcCatalog.renameTable(tableIdent, tableRenamed);

Assertions.assertThatThrownBy(
() ->
jdbcCatalog.createTable(
tableIdent,
SCHEMA,
PartitionSpec.unpartitioned(),
currentLocation,
ImmutableMap.of()))
.isInstanceOf(AlreadyExistsException.class)
.hasMessageStartingWith("Table location already exists");

Table recreated =
jdbcCatalog.createTable(
tableIdent,
SCHEMA,
PartitionSpec.unpartitioned(),
currentLocation,
ImmutableMap.of(TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED, "false"));
assertThat(recreated.location()).isEqualTo(currentLocation);

jdbcCatalog.dropTable(tableRenamed);
jdbcCatalog.dropTable(tableIdent);
}
}

@Test
public void testCreateAndDropTableWithoutNamespace() throws Exception {
TableIdentifier testTable = TableIdentifier.of("tbl");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
Expand Down Expand Up @@ -74,6 +75,8 @@
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand All @@ -84,6 +87,9 @@
import org.apache.thrift.TException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class TestHiveCatalog extends HiveMetastoreTest {
private static ImmutableMap meta =
Expand Down Expand Up @@ -1162,6 +1168,54 @@ public void testDatabaseLocationWithSlashInWarehouseDir() {
assertThat(database.getLocationUri()).isEqualTo("s3://bucket/database.db");
}

public static Stream<Arguments> fileIOClassProvider() {
return Stream.of(
Arguments.of(HadoopFileIO.class.getCanonicalName()),
Arguments.of(ResolvingFileIO.class.getCanonicalName()));
}

@ParameterizedTest
@MethodSource("fileIOClassProvider")
public void testCreateTableWithLocationConflict(String fileIOClass) throws IOException {
Schema schema = getTestSchema();
TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "original");
TableIdentifier tableRenamed = TableIdentifier.of(DB_NAME, "renamed");

ImmutableMap<String, String> catalogProps =
ImmutableMap.of(
String.format("table-default.%s", TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED),
"true",
CatalogProperties.FILE_IO_IMPL,
fileIOClass);
Catalog hiveCatalog =
CatalogUtil.loadCatalog(
HiveCatalog.class.getName(),
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE,
catalogProps,
hiveConf);

try {
Table table = hiveCatalog.buildTable(tableIdent, schema).create();
String currentLocation = table.location();

catalog.renameTable(tableIdent, tableRenamed);

assertThatThrownBy(() -> hiveCatalog.buildTable(tableIdent, schema).create())
.isInstanceOf(AlreadyExistsException.class)
.hasMessageStartingWith("Table location already exists");

Table recreated =
hiveCatalog
.buildTable(tableIdent, schema)
.withProperty(TableProperties.LOCATION_CONFLICT_DETECTION_ENABLED, "false")
.create();
assertThat(recreated.location()).isEqualTo(currentLocation);
} finally {
hiveCatalog.dropTable(tableRenamed, true);
hiveCatalog.dropTable(tableIdent, true);
}
}

@Test
public void testRegisterTable() {
TableIdentifier identifier = TableIdentifier.of(DB_NAME, "t1");
Expand Down