Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.RowChangeParadigm;
import io.trino.spi.connector.SchemaNotFoundException;
Comment thread
krvikash marked this conversation as resolved.
Outdated
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.SystemTable;
Expand Down Expand Up @@ -700,6 +701,10 @@ public Optional<ConnectorTableLayout> getNewTableLayout(ConnectorSession session
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
{
verify(transaction == null, "transaction already set");
String schemaName = tableMetadata.getTable().getSchemaName();
if (!schemaExists(session, schemaName)) {
throw new SchemaNotFoundException(schemaName);
}
transaction = newCreateTableTransaction(catalog, tableMetadata, session);
String location = transaction.table().location();
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,25 @@ public void testUnregisterTableAccessControl()
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testCreateTableWithNonExistingSchemaVerifyLocation()
{
String schemaName = "non_existing_schema_" + randomNameSuffix();
String tableName = "test_create_table_in_non_existent_schema_" + randomNameSuffix();
String tableLocation = schemaPath() + "/" + tableName;
assertQueryFails(
"CREATE TABLE " + schemaName + "." + tableName + " (a int, b int) WITH (location = '" + tableLocation + "')",
"Schema (.*) not found");
assertThat(locationExists(tableLocation))
.as("location should not exist").isFalse();

assertQueryFails(
"CREATE TABLE " + schemaName + "." + tableName + " (a, b) WITH (location = '" + tableLocation + "') AS VALUES (1, 2), (3, 4)",
"Schema (.*) not found");
assertThat(locationExists(tableLocation))
.as("location should not exist").isFalse();
}

private String getTableLocation(String tableName)
{
return (String) computeScalar("SELECT DISTINCT regexp_replace(\"$path\", '/[^/]*/[^/]*$', '') FROM " + tableName);
Expand All @@ -413,4 +432,8 @@ protected String getColumnComment(String tableName, String columnName)
protected abstract void dropTableFromMetastore(String tableName);

protected abstract String getMetadataLocation(String tableName);

protected abstract String schemaPath();

protected abstract boolean locationExists(String location);
Comment thread
krvikash marked this conversation as resolved.
Outdated
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,19 @@ protected String getMetadataLocation(String tableName)
.getParameters().get("metadata_location");
}

@Override
protected String schemaPath()
{
return format("s3://%s/%s", bucketName, schemaName);
}

@Override
protected boolean locationExists(String location)
{
String prefix = "s3://" + bucketName + "/";
return !hiveMinioDataLake.listFiles(location.substring(prefix.length())).isEmpty();
}

@Override
protected void deleteDirectory(String location)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,6 @@ public DistributedQueryRunner build()
icebergProperties.put("iceberg.catalog.type", "TESTING_FILE_METASTORE");
icebergProperties.put("hive.metastore.catalog.dir", dataDir.toString());
}
if ("jdbc".equalsIgnoreCase(catalogType) && !icebergProperties.containsKey("iceberg.jdbc-catalog.default-warehouse-dir")) {
icebergProperties.put("iceberg.jdbc-catalog.default-warehouse-dir", dataDir.toString());
}

queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties);
schemaInitializer.orElseGet(() -> SchemaInitializer.builder().build()).accept(queryRunner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ protected String getMetadataLocation(String tableName)
.getParameters().get("metadata_location");
}

@Override
protected String schemaPath()
{
return formatAbfsUrl(container, account, bucketName) + schemaName;
}

@Override
protected boolean locationExists(String location)
{
return hiveHadoop.executeInContainer("hadoop", "fs", "-test", "-d", location).getExitCode() == 0;
}

@Override
protected void deleteDirectory(String location)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore;
import static java.lang.String.format;
import static org.apache.iceberg.FileFormat.ORC;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -79,6 +80,18 @@ protected String getMetadataLocation(String tableName)
.getParameters().get("metadata_location");
}

@Override
protected String schemaPath()
{
return format("%s/%s", metastoreDir, getSession().getSchema().orElseThrow());
}

@Override
protected boolean locationExists(String location)
{
return Files.exists(Path.of(location));
}

@Override
protected void deleteDirectory(String location)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected QueryRunner createQueryRunner()
Configuration configuration = ConfigurationInstantiator.newEmptyConfiguration();
new GoogleGcsConfigurationInitializer(gcsConfig).initializeConfiguration(configuration);

this.fileSystem = FileSystem.newInstance(new URI(schemaUrl()), configuration);
this.fileSystem = FileSystem.newInstance(new URI(schemaPath()), configuration);
}
catch (IOException e) {
throw new UncheckedIOException(e);
Expand All @@ -124,7 +124,7 @@ protected QueryRunner createQueryRunner()
SchemaInitializer.builder()
.withClonedTpchTables(REQUIRED_TPCH_TABLES)
.withSchemaName(schema)
.withSchemaProperties(ImmutableMap.of("location", "'" + schemaUrl() + "'"))
.withSchemaProperties(ImmutableMap.of("location", "'" + schemaPath() + "'"))
.build())
.build();
}
Expand All @@ -134,11 +134,11 @@ public void removeTestData()
{
if (fileSystem != null) {
try {
fileSystem.delete(new org.apache.hadoop.fs.Path(schemaUrl()), true);
fileSystem.delete(new org.apache.hadoop.fs.Path(schemaPath()), true);
}
catch (IOException e) {
// The GCS bucket should be configured to expire objects automatically. Clean up issues do not need to fail the test.
LOG.warn(e, "Failed to clean up GCS test directory: %s", schemaUrl());
LOG.warn(e, "Failed to clean up GCS test directory: %s", schemaPath());
}
fileSystem = null;
}
Expand All @@ -160,14 +160,26 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
@Override
protected String createSchemaSql(String schema)
{
return format("CREATE SCHEMA %1$s WITH (location = '%2$s%1$s')", schema, schemaUrl());
return format("CREATE SCHEMA %1$s WITH (location = '%2$s%1$s')", schema, schemaPath());
}

private String schemaUrl()
@Override
protected String schemaPath()
{
return format("gs://%s/%s/", gcpStorageBucket, schema);
}

@Override
protected boolean locationExists(String location)
{
try {
return fileSystem.exists(new org.apache.hadoop.fs.Path(location));
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Test
@Override
public void testRenameSchema()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public void testCreateTable()
ImmutableMultiset.builder()
.add(CREATE_TABLE)
.add(GET_DATABASE)
.add(GET_DATABASE)
.add(GET_TABLE)
.build());
}
Expand All @@ -158,6 +159,7 @@ public void testCreateTableAsSelect()
try {
assertGlueMetastoreApiInvocations("CREATE TABLE test_ctas AS SELECT 1 AS age",
ImmutableMultiset.builder()
.add(GET_DATABASE)
.add(GET_DATABASE)
.add(CREATE_TABLE)
.add(GET_TABLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,22 @@ protected void deleteDirectory(String location)
assertThat(s3.listObjects(bucketName, location).getObjectSummaries()).isEmpty();
}

private String schemaPath()
@Override
protected String schemaPath()
{
return format("s3://%s/%s", bucketName, schemaName);
}

@Override
protected boolean locationExists(String location)
{
String prefix = "s3://" + bucketName + "/";
AmazonS3 s3 = AmazonS3ClientBuilder.standard().build();
ListObjectsV2Request request = new ListObjectsV2Request()
.withBucketName(bucketName)
.withPrefix(location.substring(prefix.length()))
.withMaxKeys(1);
return !s3.listObjectsV2(request)
.getObjectSummaries().isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,22 @@
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorBehavior;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.PASSWORD;
import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.USER;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestIcebergJdbcCatalogConnectorSmokeTest
extends BaseIcebergConnectorSmokeTest
{
private File warehouseLocation;

public TestIcebergJdbcCatalogConnectorSmokeTest()
{
super(new IcebergConfig().getFileFormat().toIceberg());
Expand All @@ -48,6 +57,8 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
protected QueryRunner createQueryRunner()
throws Exception
{
warehouseLocation = Files.createTempDirectory("test_iceberg_jdbc_catalog_smoke_test").toFile();
Comment thread
krvikash marked this conversation as resolved.
Outdated
Comment thread
ebyhr marked this conversation as resolved.
Outdated
closeAfterClass(() -> deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE));
TestingIcebergJdbcServer server = closeAfterClass(new TestingIcebergJdbcServer());
return IcebergQueryRunner.builder()
.setIcebergProperties(
Expand All @@ -59,6 +70,7 @@ protected QueryRunner createQueryRunner()
.put("iceberg.jdbc-catalog.connection-password", PASSWORD)
.put("iceberg.jdbc-catalog.catalog-name", "tpch")
.put("iceberg.register-table-procedure.enabled", "true")
.put("iceberg.jdbc-catalog.default-warehouse-dir", warehouseLocation.getAbsolutePath())
.buildOrThrow())
.setInitialTables(REQUIRED_TPCH_TABLES)
.build();
Expand Down Expand Up @@ -98,6 +110,18 @@ protected String getMetadataLocation(String tableName)
throw new UnsupportedOperationException("metadata location for register_table is not supported");
}

@Override
protected String schemaPath()
{
return format("%s/%s", warehouseLocation, getSession().getSchema().orElseThrow());
}

@Override
protected boolean locationExists(String location)
{
return Files.exists(Path.of(location));
}

@Override
public void testRegisterTableWithTableLocation()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@
import io.trino.tpch.TpchTable;
import org.testng.annotations.Test;

import java.io.File;
import java.nio.file.Files;
import java.util.Optional;
import java.util.OptionalInt;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.PASSWORD;
import static io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer.USER;
import static io.trino.tpch.TpchTable.LINE_ITEM;
Expand All @@ -34,6 +39,8 @@
public class TestIcebergJdbcConnectorTest
extends BaseIcebergConnectorTest
{
private File warehouseLocation;

public TestIcebergJdbcConnectorTest()
{
super(new IcebergConfig().getFileFormat());
Expand All @@ -43,8 +50,11 @@ public TestIcebergJdbcConnectorTest()
protected QueryRunner createQueryRunner()
throws Exception
{
warehouseLocation = Files.createTempDirectory("test_iceberg_jdbc_connector_test").toFile();
closeAfterClass(() -> deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE));
TestingIcebergJdbcServer server = closeAfterClass(new TestingIcebergJdbcServer());
return IcebergQueryRunner.builder()
.setBaseDataDir(Optional.of(warehouseLocation.toPath()))
.setIcebergProperties(
ImmutableMap.<String, String>builder()
.put("iceberg.file-format", format.name())
Expand All @@ -53,6 +63,7 @@ protected QueryRunner createQueryRunner()
.put("iceberg.jdbc-catalog.connection-user", USER)
.put("iceberg.jdbc-catalog.connection-password", PASSWORD)
.put("iceberg.jdbc-catalog.catalog-name", "tpch")
.put("iceberg.jdbc-catalog.default-warehouse-dir", warehouseLocation.toPath().resolve("iceberg_data").toFile().getAbsolutePath())
.buildOrThrow())
.setInitialTables(ImmutableList.<TpchTable<?>>builder()
.addAll(REQUIRED_TPCH_TABLES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,20 @@
import org.assertj.core.util.Files;

import java.io.File;
import java.nio.file.Path;
import java.util.Optional;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.plugin.iceberg.catalog.rest.RestCatalogTestUtils.backendCatalog;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestIcebergRestCatalogConnectorSmokeTest
extends BaseIcebergConnectorSmokeTest
{
private File warehouseLocation;

public TestIcebergRestCatalogConnectorSmokeTest()
{
super(new IcebergConfig().getFileFormat().toIceberg());
Expand All @@ -56,7 +60,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
protected QueryRunner createQueryRunner()
throws Exception
{
File warehouseLocation = Files.newTemporaryFolder();
warehouseLocation = Files.newTemporaryFolder();
closeAfterClass(() -> deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE));

Catalog backend = backendCatalog(warehouseLocation);
Expand Down Expand Up @@ -116,6 +120,18 @@ protected String getMetadataLocation(String tableName)
throw new UnsupportedOperationException("metadata location for register_table is not supported");
}

@Override
protected String schemaPath()
{
return format("%s/%s", warehouseLocation, getSession().getSchema());
}

@Override
protected boolean locationExists(String location)
{
return java.nio.file.Files.exists(Path.of(location));
}

@Override
public void testRegisterTableWithTableLocation()
{
Expand Down