Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,6 @@ jobs:
(env.CI_SKIP_SECRETS_PRESENCE_CHECKS != '' || env.AWS_ACCESS_KEY_ID != '' || env.AWS_SECRET_ACCESS_KEY != '' || env.GCP_CREDENTIALS_KEY != '')
run: |
$MAVEN test ${MAVEN_TEST} -pl :trino-iceberg ${{ format('-P {0}', matrix.profile) }} \
-Ds3.bucket=${S3_BUCKET} \
-Dtesting.gcp-storage-bucket="${GCP_STORAGE_BUCKET}" \
-Dtesting.gcp-credentials-key="${GCP_CREDENTIALS_KEY}" \
-Dhive.hadoop2.azure-abfs-container="${ABFS_CONTAINER}" \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@
import static com.google.common.collect.Sets.difference;
import static com.google.common.primitives.Ints.max;
import static io.trino.filesystem.Locations.appendPath;
import static io.trino.filesystem.Locations.getParent;
import static io.trino.plugin.deltalake.DataFileInfo.DataFileType.DATA;
import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode.FULL_REFRESH;
import static io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties.AnalyzeMode.INCREMENTAL;
Expand Down Expand Up @@ -1903,7 +1902,7 @@ private void checkWriteAllowed(ConnectorSession session, DeltaLakeTableHandle ta
private boolean allowWrite(ConnectorSession session, DeltaLakeTableHandle tableHandle)
{
try {
String tableMetadataDirectory = appendPath(getParent(tableHandle.getLocation()), tableHandle.getTableName());
String tableMetadataDirectory = getTransactionLogDir(tableHandle.getLocation());
boolean requiresOptIn = transactionLogWriterFactory.newWriter(session, tableMetadataDirectory).isUnsafe();
return !requiresOptIn || unsafeWritesEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.plugin.deltalake.metastore.glue;

import com.google.common.collect.ImmutableMap;
import io.trino.hdfs.TrinoFileSystemCache;
import io.trino.plugin.deltalake.DeltaLakeQueryRunner;
import io.trino.plugin.hive.BaseS3AndGlueMetastoreTest;
import io.trino.testing.DistributedQueryRunner;
Expand Down Expand Up @@ -42,8 +41,6 @@ public TestDeltaS3AndGlueMetastoreTest()
protected QueryRunner createQueryRunner()
throws Exception
{
closeAfterClass(TrinoFileSystemCache.INSTANCE::closeAll);

metastore = createTestingGlueHiveMetastore(Path.of(schemaPath()));
DistributedQueryRunner queryRunner = DeltaLakeQueryRunner.builder()
.setCatalogName(DELTA_CATALOG)
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.hdfs.TrinoFileSystemCache;
import io.trino.spi.security.Identity;
import io.trino.spi.security.SelectedRole;
import io.trino.testing.DistributedQueryRunner;
Expand All @@ -27,6 +26,9 @@
import java.util.Optional;
import java.util.Set;

import static io.trino.plugin.hive.BaseS3AndGlueMetastoreTest.LocationPattern.DOUBLE_SLASH;
import static io.trino.plugin.hive.BaseS3AndGlueMetastoreTest.LocationPattern.TRIPLE_SLASH;
import static io.trino.plugin.hive.BaseS3AndGlueMetastoreTest.LocationPattern.TWO_TRAILING_SLASHES;
import static io.trino.plugin.hive.metastore.glue.GlueHiveMetastore.createTestingGlueHiveMetastore;
import static io.trino.spi.security.SelectedRole.Type.ROLE;
import static io.trino.testing.TestingNames.randomNameSuffix;
Expand All @@ -47,8 +49,6 @@ public TestHiveS3AndGlueMetastoreTest()
protected QueryRunner createQueryRunner()
throws Exception
{
closeAfterClass(TrinoFileSystemCache.INSTANCE::closeAll);

metastore = createTestingGlueHiveMetastore(Path.of(schemaPath()));

Session session = createSession(Optional.of(new SelectedRole(ROLE, Optional.of("admin"))));
Expand Down Expand Up @@ -115,79 +115,80 @@ protected void validateFilesAfterOptimize(String location, Set<String> initialFi

@Override // Row-level modifications are not supported for Hive tables
@Test(dataProvider = "locationPatternsDataProvider")
public void testBasicOperationsWithProvidedTableLocation(boolean partitioned, String locationPattern)
public void testBasicOperationsWithProvidedTableLocation(boolean partitioned, LocationPattern locationPattern)
{
String tableName = "test_basic_operations_" + randomNameSuffix();
String location = locationPattern.formatted(bucketName, schemaName, tableName);
String location = locationPattern.locationForTable(bucketName, schemaName, tableName);
String partitionQueryPart = (partitioned ? ",partitioned_by = ARRAY['col_int']" : "");

String create = "CREATE TABLE " + tableName + "(col_str, col_int)" +
"WITH (external_location = '" + location + "'" + partitionQueryPart + ") " +
"AS VALUES ('str1', 1), ('str2', 2), ('str3', 3)";
if (locationPattern.contains("double_slash")) {
if (locationPattern == DOUBLE_SLASH || locationPattern == TRIPLE_SLASH || locationPattern == TWO_TRAILING_SLASHES) {
assertQueryFails(create, "\\QUnsupported location that cannot be internally represented: " + location);
return;
}
assertUpdate(create, 3);
assertQuery("SELECT * FROM " + tableName, "VALUES ('str1', 1), ('str2', 2), ('str3', 3)");

String actualTableLocation = getTableLocation(tableName);
assertThat(actualTableLocation).isEqualTo(location);
try (UncheckedCloseable ignored = onClose("DROP TABLE " + tableName)) {
assertQuery("SELECT * FROM " + tableName, "VALUES ('str1', 1), ('str2', 2), ('str3', 3)");

assertUpdate("INSERT INTO " + tableName + " VALUES ('str4', 4)", 1);
assertQuery("SELECT * FROM " + tableName, "VALUES ('str1', 1), ('str2', 2), ('str3', 3), ('str4', 4)");
String actualTableLocation = getTableLocation(tableName);
assertThat(actualTableLocation).isEqualTo(location);

assertThat(getTableFiles(actualTableLocation)).isNotEmpty();
validateDataFiles(partitioned ? "col_int" : "", tableName, actualTableLocation);
assertUpdate("INSERT INTO " + tableName + " VALUES ('str4', 4)", 1);
assertQuery("SELECT * FROM " + tableName, "VALUES ('str1', 1), ('str2', 2), ('str3', 3), ('str4', 4)");

assertUpdate("DROP TABLE " + tableName);
assertThat(getTableFiles(actualTableLocation)).isNotEmpty();
validateDataFiles(partitioned ? "col_int" : "", tableName, actualTableLocation);
}
}

@Override // Row-level modifications are not supported for Hive tables
@Test(dataProvider = "locationPatternsDataProvider")
public void testBasicOperationsWithProvidedSchemaLocation(boolean partitioned, String locationPattern)
public void testBasicOperationsWithProvidedSchemaLocation(boolean partitioned, LocationPattern locationPattern)
{
String schemaName = "test_basic_operations_schema_" + randomNameSuffix();
String schemaLocation = locationPattern.formatted(bucketName, schemaName, schemaName);
String schemaLocation = locationPattern.locationForSchema(bucketName, schemaName);
String tableName = "test_basic_operations_table_" + randomNameSuffix();
String qualifiedTableName = schemaName + "." + tableName;
String partitionQueryPart = (partitioned ? " WITH (partitioned_by = ARRAY['col_int'])" : "");

String actualTableLocation;
assertUpdate("CREATE SCHEMA " + schemaName + " WITH (location = '" + schemaLocation + "')");
assertThat(getSchemaLocation(schemaName)).isEqualTo(schemaLocation);

assertUpdate("CREATE TABLE " + qualifiedTableName + "(col_str varchar, col_int int)" + partitionQueryPart);
String expectedTableLocation = ((schemaLocation.endsWith("/") ? schemaLocation : schemaLocation + "/") + tableName)
// Hive normalizes double slash
.replaceAll("(?<!(s3:))//", "/");

String actualTableLocation = metastore.getTable(schemaName, tableName).orElseThrow().getStorage().getLocation();
assertThat(actualTableLocation).matches(expectedTableLocation);
try (UncheckedCloseable ignoredDropSchema = onClose("DROP SCHEMA " + schemaName)) {
assertThat(getSchemaLocation(schemaName)).isEqualTo(schemaLocation);

assertUpdate("INSERT INTO " + qualifiedTableName + " VALUES ('str1', 1), ('str2', 2), ('str3', 3)", 3);
assertQuery("SELECT * FROM " + qualifiedTableName, "VALUES ('str1', 1), ('str2', 2), ('str3', 3)");
assertUpdate("CREATE TABLE " + qualifiedTableName + "(col_str varchar, col_int int)" + partitionQueryPart);
try (UncheckedCloseable ignoredDropTable = onClose("DROP TABLE " + qualifiedTableName)) {
String expectedTableLocation = ((schemaLocation.endsWith("/") ? schemaLocation : schemaLocation + "/") + tableName)
// Hive normalizes repeated slashes
.replaceAll("(?<!(s3:))/+", "/");

assertThat(getTableFiles(actualTableLocation)).isNotEmpty();
validateDataFiles(partitioned ? "col_int" : "", qualifiedTableName, actualTableLocation);
actualTableLocation = metastore.getTable(schemaName, tableName).orElseThrow().getStorage().getLocation();
assertThat(actualTableLocation).matches(expectedTableLocation);

assertUpdate("DROP TABLE " + qualifiedTableName);
assertThat(getTableFiles(actualTableLocation)).isEmpty();
assertUpdate("INSERT INTO " + qualifiedTableName + " VALUES ('str1', 1), ('str2', 2), ('str3', 3)", 3);
assertQuery("SELECT * FROM " + qualifiedTableName, "VALUES ('str1', 1), ('str2', 2), ('str3', 3)");

assertUpdate("DROP SCHEMA " + schemaName);
assertThat(getTableFiles(actualTableLocation)).isNotEmpty();
validateDataFiles(partitioned ? "col_int" : "", qualifiedTableName, actualTableLocation);
}
assertThat(getTableFiles(actualTableLocation)).isEmpty();
}
validateFilesAfterDrop(actualTableLocation);
}

@Override
@Test(dataProvider = "locationPatternsDataProvider")
public void testMergeWithProvidedTableLocation(boolean partitioned, String locationPattern)
public void testMergeWithProvidedTableLocation(boolean partitioned, LocationPattern locationPattern)
{
// Row-level modifications are not supported for Hive tables
}

@Override
public void testOptimizeWithProvidedTableLocation(boolean partitioned, String locationPattern)
public void testOptimizeWithProvidedTableLocation(boolean partitioned, LocationPattern locationPattern)
{
if (locationPattern.contains("double_slash")) {
if (locationPattern == DOUBLE_SLASH || locationPattern == TRIPLE_SLASH || locationPattern == TWO_TRAILING_SLASHES) {
assertThatThrownBy(() -> super.testOptimizeWithProvidedTableLocation(partitioned, locationPattern))
.hasMessageStartingWith("Unsupported location that cannot be internally represented: ")
.hasStackTraceContaining("SQL: CREATE TABLE test_optimize_");
Expand All @@ -197,59 +198,58 @@ public void testOptimizeWithProvidedTableLocation(boolean partitioned, String lo
}

@Test(dataProvider = "locationPatternsDataProvider")
public void testAnalyzeWithProvidedTableLocation(boolean partitioned, String locationPattern)
public void testAnalyzeWithProvidedTableLocation(boolean partitioned, LocationPattern locationPattern)
{
String tableName = "test_analyze_" + randomNameSuffix();
String location = locationPattern.formatted(bucketName, schemaName, tableName);
String location = locationPattern.locationForTable(bucketName, schemaName, tableName);
String partitionQueryPart = (partitioned ? ",partitioned_by = ARRAY['col_int']" : "");

String create = "CREATE TABLE " + tableName + "(col_str, col_int)" +
"WITH (external_location = '" + location + "'" + partitionQueryPart + ") " +
"AS VALUES ('str1', 1), ('str2', 2), ('str3', 3)";
if (locationPattern.contains("double_slash")) {
if (locationPattern == DOUBLE_SLASH || locationPattern == TRIPLE_SLASH || locationPattern == TWO_TRAILING_SLASHES) {
assertQueryFails(create, "\\QUnsupported location that cannot be internally represented: " + location);
return;
}
assertUpdate(create, 3);

assertUpdate("INSERT INTO " + tableName + " VALUES ('str4', 4)", 1);
assertQuery("SELECT * FROM " + tableName, "VALUES ('str1', 1), ('str2', 2), ('str3', 3), ('str4', 4)");

//Check statistics collection on write
if (partitioned) {
assertQuery("SHOW STATS FOR " + tableName, """
VALUES
('col_str', 0.0, 1.0, 0.0, null, null, null),
('col_int', null, 4.0, 0.0, null, 1, 4),
(null, null, null, null, 4.0, null, null)""");
}
else {
assertQuery("SHOW STATS FOR " + tableName, """
VALUES
('col_str', 16.0, 3.0, 0.0, null, null, null),
('col_int', null, 3.0, 0.0, null, 1, 4),
(null, null, null, null, 4.0, null, null)""");
try (UncheckedCloseable ignored = onClose("DROP TABLE " + tableName)) {
assertUpdate("INSERT INTO " + tableName + " VALUES ('str4', 4)", 1);
assertQuery("SELECT * FROM " + tableName, "VALUES ('str1', 1), ('str2', 2), ('str3', 3), ('str4', 4)");

// Check statistics collection on write
if (partitioned) {
assertQuery("SHOW STATS FOR " + tableName, """
VALUES
('col_str', 0.0, 1.0, 0.0, null, null, null),
('col_int', null, 4.0, 0.0, null, 1, 4),
(null, null, null, null, 4.0, null, null)""");
}
else {
assertQuery("SHOW STATS FOR " + tableName, """
VALUES
('col_str', 16.0, 3.0, 0.0, null, null, null),
('col_int', null, 3.0, 0.0, null, 1, 4),
(null, null, null, null, 4.0, null, null)""");
}

// Check statistics collection explicitly
assertUpdate("ANALYZE " + tableName, 4);

if (partitioned) {
assertQuery("SHOW STATS FOR " + tableName, """
VALUES
('col_str', 16.0, 1.0, 0.0, null, null, null),
('col_int', null, 4.0, 0.0, null, 1, 4),
(null, null, null, null, 4.0, null, null)""");
}
else {
assertQuery("SHOW STATS FOR " + tableName, """
VALUES
('col_str', 16.0, 4.0, 0.0, null, null, null),
('col_int', null, 4.0, 0.0, null, 1, 4),
(null, null, null, null, 4.0, null, null)""");
}
}

//Check statistics collection explicitly
assertUpdate("ANALYZE " + tableName, 4);

if (partitioned) {
assertQuery("SHOW STATS FOR " + tableName, """
VALUES
('col_str', 16.0, 1.0, 0.0, null, null, null),
('col_int', null, 4.0, 0.0, null, 1, 4),
(null, null, null, null, 4.0, null, null)""");
}
else {
assertQuery("SHOW STATS FOR " + tableName, """
VALUES
('col_str', 16.0, 4.0, 0.0, null, null, null),
('col_int', null, 4.0, 0.0, null, 1, 4),
(null, null, null, null, 4.0, null, null)""");
}

assertUpdate("DROP TABLE " + tableName);
}

@Test
Expand Down Expand Up @@ -285,15 +285,15 @@ public void testCreateSchemaWithIncorrectLocation()
String qualifiedTableName = schemaName + "." + tableName;

assertUpdate("CREATE SCHEMA " + schemaName + " WITH (location = '" + schemaLocation + "')");
assertThat(getSchemaLocation(schemaName)).isEqualTo(schemaLocation);
try (UncheckedCloseable ignored = onClose("DROP SCHEMA " + schemaName)) {
assertThat(getSchemaLocation(schemaName)).isEqualTo(schemaLocation);

assertThatThrownBy(() -> assertUpdate("CREATE TABLE " + qualifiedTableName + "(col_str, col_int) AS VALUES ('str1', 1)"))
.hasMessageContaining("Fragment is not allowed in a file system location");
assertThatThrownBy(() -> assertUpdate("CREATE TABLE " + qualifiedTableName + "(col_str, col_int) AS VALUES ('str1', 1)"))
.hasMessageContaining("Fragment is not allowed in a file system location");

assertThatThrownBy(() -> assertUpdate("CREATE TABLE " + qualifiedTableName + "(col_str varchar, col_int integer)"))
.hasMessageContaining("Fragment is not allowed in a file system location");

assertUpdate("DROP SCHEMA " + schemaName);
assertThatThrownBy(() -> assertUpdate("CREATE TABLE " + qualifiedTableName + "(col_str varchar, col_int integer)"))
.hasMessageContaining("Fragment is not allowed in a file system location");
}
}

@Test
Expand All @@ -304,8 +304,8 @@ public void testSchemaNameEscape()
String tableName = "test_table_schema_escaped_" + randomNameSuffix();

assertUpdate("CREATE SCHEMA \"%2$s\" WITH (location = 's3://%1$s/%2$s')".formatted(bucketName, schemaName));
assertQueryFails("CREATE TABLE \"" + schemaName + "\"." + tableName + " (col) AS VALUES 1", "Failed checking path: .*");

assertUpdate("DROP SCHEMA \"" + schemaName + "\"");
try (UncheckedCloseable ignored = onClose("DROP SCHEMA \"" + schemaName + "\"")) {
assertQueryFails("CREATE TABLE \"" + schemaName + "\"." + tableName + " (col) AS VALUES 1", "Failed checking path: .*");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import io.trino.testing.QueryRunner;
import org.apache.iceberg.FileFormat;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Parameters;
import org.testng.annotations.Test;

import java.util.List;
Expand Down Expand Up @@ -72,11 +71,10 @@ public class TestIcebergGlueCatalogConnectorSmokeTest
private final AWSGlueAsync glueClient;
private final TrinoFileSystemFactory fileSystemFactory;

@Parameters("s3.bucket")
public TestIcebergGlueCatalogConnectorSmokeTest(String bucketName)
public TestIcebergGlueCatalogConnectorSmokeTest()
{
super(FileFormat.PARQUET);
this.bucketName = requireNonNull(bucketName, "bucketName is null");
this.bucketName = requireNonNull(System.getenv("S3_BUCKET"), "Environment S3_BUCKET was not set");
this.schemaName = "test_iceberg_smoke_" + randomNameSuffix();
glueClient = AWSGlueAsyncClientBuilder.defaultClient();

Expand Down
Loading