Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ is used.
* - ``iceberg.target-max-file-size``
- Target maximum size of written files; the actual size may be larger
- ``1GB``
* - ``iceberg.unique-table-location``
- Use randomized, unique table locations
- ``false``
* - ``iceberg.delete-schema-locations-fallback``
- Whether schema locations should be deleted when Trino can't determine whether they contain external files.
- ``false``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,16 +565,30 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
{
verify(transaction == null, "transaction already set");
transaction = newCreateTableTransaction(catalog, tableMetadata, session);
return new IcebergWritableTableHandle(
tableMetadata.getTable().getSchemaName(),
tableMetadata.getTable().getTableName(),
SchemaParser.toJson(transaction.table().schema()),
PartitionSpecParser.toJson(transaction.table().spec()),
getColumns(transaction.table().schema(), typeManager),
transaction.table().location(),
getFileFormat(transaction.table()),
transaction.table().properties(),
retryMode);
String location = transaction.table().location();
HdfsContext hdfsContext = new HdfsContext(session);
try {
Path path = new Path(location);
FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, path);
if (fileSystem.exists(path) && fileSystem.listFiles(path, true).hasNext()) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, format(
"Cannot create a table on a non-empty location: %s, set 'iceberg.unique-table-location=true' in your Iceberg catalog properties " +
"to use unique table locations for every table.", location));
}
return new IcebergWritableTableHandle(
tableMetadata.getTable().getSchemaName(),
tableMetadata.getTable().getTableName(),
SchemaParser.toJson(transaction.table().schema()),
PartitionSpecParser.toJson(transaction.table().spec()),
getColumns(transaction.table().schema(), typeManager),
location,
getFileFormat(transaction.table()),
transaction.table().properties(),
retryMode);
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed checking new table's location: " + location, e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterables.concat;
Expand Down Expand Up @@ -1021,6 +1022,38 @@ public void testLargeInOnPartitionedColumns()
dropTable("test_in_predicate_large_set");
}

@Test
public void testCreateTableFailsOnNonEmptyPath()
{
String tableName = "test_rename_table_" + randomTableSuffix();
String tmpName = "test_rename_table_tmp_" + randomTableSuffix();
try {
assertUpdate("CREATE TABLE " + tmpName + " AS SELECT 1 as a", 1);
assertUpdate("ALTER TABLE " + tmpName + " RENAME TO " + tableName);
assertQueryFails("CREATE TABLE " + tmpName + " AS SELECT 1 as a", "Cannot create a table on a non-empty location.*");
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tableName);
assertUpdate("DROP TABLE IF EXISTS " + tmpName);
}
}

@Test
public void testCreateTableSucceedsOnEmptyDirectory()
{
File tempDir = getDistributedQueryRunner().getCoordinator().getBaseDataDir().toFile();
String tmpName = "test_rename_table_tmp_" + randomTableSuffix();
Path newPath = tempDir.toPath().resolve(tmpName);
File directory = newPath.toFile();
verify(directory.mkdirs(), "Could not make directory on filesystem");
try {
assertUpdate("CREATE TABLE " + tmpName + " WITH (location='" + directory + "') AS SELECT 1 as a", 1);
}
finally {
assertUpdate("DROP TABLE IF EXISTS " + tmpName);
}
}

@Test
public void testCreateTableLike()
{
Expand Down Expand Up @@ -1057,21 +1090,11 @@ private void testCreateTableLikeForFormat(IcebergFileFormat otherFormat)
format(" format = '%s',\n format_version = 2,\n location = '%s'\n)", format, tempDir + "/iceberg_data/tpch/test_create_table_like_copy2"));
dropTable("test_create_table_like_copy2");

assertUpdate("CREATE TABLE test_create_table_like_copy3 (LIKE test_create_table_like_original INCLUDING PROPERTIES)");
assertEquals(getTablePropertiesString("test_create_table_like_copy3"), "WITH (\n" +
format(" format = '%s',\n", format) +
" format_version = 2,\n" +
format(" location = '%s',\n", tempDirPath) +
" partitioning = ARRAY['adate']\n" +
")");
assertQueryFails("CREATE TABLE test_create_table_like_copy3 (LIKE test_create_table_like_original INCLUDING PROPERTIES)",
"Cannot create a table on a non-empty location.*");

assertUpdate(format("CREATE TABLE test_create_table_like_copy4 (LIKE test_create_table_like_original INCLUDING PROPERTIES) WITH (format = '%s')", otherFormat));
assertEquals(getTablePropertiesString("test_create_table_like_copy4"), "WITH (\n" +
format(" format = '%s',\n", otherFormat) +
" format_version = 2,\n" +
format(" location = '%s',\n", tempDirPath) +
" partitioning = ARRAY['adate']\n" +
")");
assertQueryFails(format("CREATE TABLE test_create_table_like_copy4 (LIKE test_create_table_like_original INCLUDING PROPERTIES) WITH (format = '%s')", otherFormat),
"Cannot create a table on a non-empty location.*");
}

private String getTablePropertiesString(String tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,6 @@ public void testCreateAndDrop()
assertQuerySucceeds(format("DROP TABLE %s", tableName));
assertThat(metastore.getTable("tpch", tableName)).as("Table should be dropped").isEmpty();
assertFalse(fileSystem.exists(new Path(dataFile.getFilePath())), "The data file should have been removed");
assertFalse(fileSystem.exists(tableLocation), "The directory corresponding to the dropped Iceberg table should not be removed because it may be shared with other tables");
assertFalse(fileSystem.exists(tableLocation), "The directory corresponding to the dropped Iceberg table should be removed as we don't allow shared locations.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,45 @@ public void testTrinoShowingSparkCreatedTables(int specVersion)
onTrino().executeQuery("DROP TABLE " + trinoTableName(trinoTable));
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions")
public void testCreateAndDropTableWithSameLocationWorksOnSpark(int specVersion)
{
String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_create_table_same_location/obj-data";
String tableSameLocation1 = "test_same_location_spark_1_" + randomTableSuffix();
String tableSameLocation2 = "test_same_location_spark_2_" + randomTableSuffix();

onSpark().executeQuery(format("CREATE TABLE %s (_integer INTEGER ) USING ICEBERG LOCATION '%s' TBLPROPERTIES('format-version' = %s)",
sparkTableName(tableSameLocation1), dataPath, specVersion));
onSpark().executeQuery(format("CREATE TABLE %s (_integer INTEGER ) USING ICEBERG LOCATION '%s' TBLPROPERTIES('format-version' = %s)",
sparkTableName(tableSameLocation2), dataPath, specVersion));

onSpark().executeQuery(format("DROP TABLE IF EXISTS %s", sparkTableName(tableSameLocation1)));

assertThat(onTrino().executeQuery(format("SELECT * FROM %s", trinoTableName(tableSameLocation2)))).hasNoRows();

onSpark().executeQuery(format("DROP TABLE %s", sparkTableName(tableSameLocation2)));
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "specVersions")
public void testCreateAndDropTableWithSameLocationFailsOnTrino(int specVersion)
{
String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_create_table_same_location/obj-data";
String tableSameLocation1 = "test_same_location_trino_1_" + randomTableSuffix();
String tableSameLocation2 = "test_same_location_trino_2_" + randomTableSuffix();

onSpark().executeQuery(format("CREATE TABLE %s (_integer INTEGER ) USING ICEBERG LOCATION '%s' TBLPROPERTIES('format-version' = %s)",
sparkTableName(tableSameLocation1), dataPath, specVersion));
onSpark().executeQuery(format("CREATE TABLE %s (_integer INTEGER ) USING ICEBERG LOCATION '%s' TBLPROPERTIES('format-version' = %s)",
sparkTableName(tableSameLocation2), dataPath, specVersion));

onTrino().executeQuery(format("DROP TABLE %s", trinoTableName(tableSameLocation1)));

assertQueryFailure(() -> onTrino().executeQuery(format("SELECT * FROM %s", trinoTableName(tableSameLocation2))))
.hasMessageMatching(".*Failed to open input stream for file.*");

// Can't clean up tableSameLocation2 as all data and metadata has been removed
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion")
public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat storageFormat, int specVersion)
{
Expand Down