Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@
import io.trino.tests.product.launcher.env.common.StandardMultinode;
import io.trino.tests.product.launcher.env.common.TestsEnvironment;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Set;

import static io.trino.tests.product.launcher.env.EnvironmentContainers.TESTS;
import static io.trino.tests.product.launcher.env.common.Minio.MINIO_CONTAINER_NAME;
import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TRINO_ETC;
import static org.testcontainers.utility.MountableFile.forHostPath;

Expand All @@ -32,6 +43,8 @@
public class EnvMultinodeMinioDataLake
extends EnvironmentProvider
{
private static final String S3_BUCKET_NAME = "test-bucket";

private final DockerFiles.ResourceProvider configDir;

@Inject
Expand All @@ -44,6 +57,23 @@ public EnvMultinodeMinioDataLake(StandardMultinode standardMultinode, Hadoop had
@Override
public void extendEnvironment(Environment.Builder builder)
{
builder.configureContainer(TESTS, dockerContainer -> {
dockerContainer.withEnv("S3_BUCKET", S3_BUCKET_NAME);
});

// initialize buckets in minio
FileAttribute<Set<PosixFilePermission>> posixFilePermissions = PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--"));
Path minioBucketDirectory;
try {
minioBucketDirectory = Files.createTempDirectory("test-bucket-contents", posixFilePermissions);
minioBucketDirectory.toFile().deleteOnExit();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
builder.configureContainer(MINIO_CONTAINER_NAME, container ->
container.withCopyFileToContainer(forHostPath(minioBucketDirectory), "/data/" + S3_BUCKET_NAME));

builder.addConnector("hive", forHostPath(configDir.getPath("hive.properties")));
builder.addConnector(
"delta_lake",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@
import io.trino.tests.product.launcher.env.common.StandardMultinode;
import io.trino.tests.product.launcher.env.common.TestsEnvironment;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Set;

import static io.trino.tests.product.launcher.env.EnvironmentContainers.TESTS;
import static io.trino.tests.product.launcher.env.common.Minio.MINIO_CONTAINER_NAME;
import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TRINO_ETC;
import static org.testcontainers.utility.MountableFile.forHostPath;

Expand All @@ -32,6 +43,8 @@ public final class EnvMultinodeMinioDataLakeCaching
{
private static final String CONTAINER_TRINO_DELTA_LAKE_PROPERTIES = CONTAINER_TRINO_ETC + "/catalog/delta.properties";
private static final String CONTAINER_TRINO_DELTA_LAKE_NON_CACHED_PROPERTIES = CONTAINER_TRINO_ETC + "/catalog/delta_non_cached.properties";
private static final String S3_BUCKET_NAME = "test-bucket";

private final DockerFiles.ResourceProvider configDir;

@Inject
Expand All @@ -44,6 +57,23 @@ public EnvMultinodeMinioDataLakeCaching(StandardMultinode standardMultinode, Had
@Override
public void extendEnvironment(Environment.Builder builder)
{
builder.configureContainer(TESTS, dockerContainer -> {
dockerContainer.withEnv("S3_BUCKET", S3_BUCKET_NAME);
});

// initialize buckets in minio
FileAttribute<Set<PosixFilePermission>> posixFilePermissions = PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--"));
Path minioBucketDirectory;
try {
minioBucketDirectory = Files.createTempDirectory("test-bucket-contents", posixFilePermissions);
minioBucketDirectory.toFile().deleteOnExit();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
builder.configureContainer(MINIO_CONTAINER_NAME, container ->
container.withCopyFileToContainer(forHostPath(minioBucketDirectory), "/data/" + S3_BUCKET_NAME));

builder.addConnector("delta_lake", forHostPath(configDir.getPath("multinode-minio-data-lake/delta.properties")), CONTAINER_TRINO_DELTA_LAKE_NON_CACHED_PROPERTIES);
builder.addConnector("delta_lake", forHostPath(configDir.getPath("multinode-minio-data-lake-cached/delta.properties")), CONTAINER_TRINO_DELTA_LAKE_PROPERTIES);
builder.configureContainers(container -> container.withTmpFs(ImmutableMap.of("/tmp/cache", "rw")));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
connector.name=delta_lake
hive.metastore.uri=thrift://hadoop-master:9083
fs.native-s3.enabled=true
fs.hadoop.enabled=false
s3.region=us-east-1
s3.aws-access-key=minio-access-key
s3.aws-secret-key=minio-secret-key
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
connector.name=delta_lake
hive.metastore.uri=thrift://hadoop-master:9083
fs.native-s3.enabled=true
fs.hadoop.enabled=false
s3.region=us-east-1
s3.aws-access-key=minio-access-key
s3.aws-secret-key=minio-secret-key
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
connector.name=hive
hive.metastore.uri=thrift://hadoop-master:9083
fs.native-s3.enabled=true
fs.hadoop.enabled=false
s3.region=us-east-1
s3.aws-access-key=minio-access-key
s3.aws-secret-key=minio-secret-key
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
connector.name=iceberg
hive.metastore.uri=thrift://hadoop-master:9083
fs.native-s3.enabled=true
fs.hadoop.enabled=false
s3.region=us-east-1
s3.aws-access-key=minio-access-key
s3.aws-secret-key=minio-secret-key
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
connector.name=delta_lake
hive.metastore.uri=thrift://hadoop-master:9083
fs.native-s3.enabled=true
fs.hadoop.enabled=false
s3.region=us-east-1
s3.aws-access-key=minio-access-key
s3.aws-secret-key=minio-secret-key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ connector.name=hive
hive.metastore.uri=thrift://hadoop-master:9083
hive.non-managed-table-writes-enabled=true
fs.native-s3.enabled=true
fs.hadoop.enabled=false
s3.region=us-east-1
s3.aws-access-key=minio-access-key
s3.aws-secret-key=minio-secret-key
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
connector.name=hudi
hive.metastore.uri=thrift://hadoop-master:9083
fs.native-s3.enabled=true
fs.hadoop.enabled=false
s3.region=us-east-1
s3.aws-access-key=minio-access-key
s3.aws-secret-key=minio-secret-key
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
connector.name=hudi
hive.metastore.uri=thrift://hadoop-master:9083
fs.native-s3.enabled=true
fs.hadoop.enabled=false
s3.region=us-east-1
s3.aws-access-key=minio-access-key
s3.aws-secret-key=minio-secret-key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.tests.product.deltalake;

import io.airlift.units.Duration;
import io.trino.tempto.BeforeMethodWithContext;
import io.trino.tempto.ProductTest;
import io.trino.tests.product.utils.CachingTestUtils.CacheStats;
import org.testng.annotations.Test;
Expand All @@ -25,12 +26,21 @@
import static io.trino.tests.product.utils.CachingTestUtils.getCacheStats;
import static io.trino.tests.product.utils.QueryAssertions.assertEventually;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;

public class TestDeltaLakeAlluxioCaching
extends ProductTest
{
private String bucketName;

@BeforeMethodWithContext
public void setUp()
{
bucketName = requireNonNull(System.getenv("S3_BUCKET"), "Environment variable not set: S3_BUCKET");
}

@Test(groups = {DELTA_LAKE_ALLUXIO_CACHING, PROFILE_SPECIFIC_TESTS})
public void testReadFromCache()
{
Expand All @@ -40,42 +50,49 @@ public void testReadFromCache()

private void testReadFromTable(String tableNameSuffix)
{
String cachedTableName = "delta.default.test_cache_read" + tableNameSuffix;
String nonCachedTableName = "delta_non_cached.default.test_cache_read" + tableNameSuffix;
String cachedSchemaName = "delta.test_caching";
String nonCachedTableName = "delta_non_cached.test_caching.test_cache_read" + tableNameSuffix;

createTestTable(cachedTableName);
try {
onTrino().executeQuery("CREATE SCHEMA " + cachedSchemaName + " WITH (location = 's3://" + bucketName + "/test_delta_cached')");
String cachedTableName = cachedSchemaName + ".test_cache_read" + tableNameSuffix;

CacheStats beforeCacheStats = getCacheStats("delta");
createTestTable(cachedTableName);

long tableSize = (Long) onTrino().executeQuery("SELECT SUM(size) as size FROM (SELECT \"$path\", \"$file_size\" AS size FROM " + nonCachedTableName + " GROUP BY 1, 2)").getOnlyValue();
CacheStats beforeCacheStats = getCacheStats("delta");

assertThat(onTrino().executeQuery("SELECT * FROM " + cachedTableName)).hasAnyRows();
long tableSize = (Long) onTrino().executeQuery("SELECT SUM(size) as size FROM (SELECT \"$path\", \"$file_size\" AS size FROM " + nonCachedTableName + " GROUP BY 1, 2)").getOnlyValue();

assertEventually(
new Duration(20, SECONDS),
() -> {
// first query via caching catalog should fetch external data
CacheStats afterQueryCacheStats = getCacheStats("delta");
assertGreaterThanOrEqual(afterQueryCacheStats.cacheSpaceUsed(), beforeCacheStats.cacheSpaceUsed() + tableSize);
assertGreaterThan(afterQueryCacheStats.externalReads(), beforeCacheStats.externalReads());
assertGreaterThanOrEqual(afterQueryCacheStats.cacheReads(), beforeCacheStats.cacheReads());
});
assertThat(onTrino().executeQuery("SELECT * FROM " + cachedTableName)).hasAnyRows();

assertEventually(
new Duration(10, SECONDS),
() -> {
CacheStats beforeQueryCacheStats = getCacheStats("delta");
assertEventually(
new Duration(20, SECONDS),
() -> {
// first query via caching catalog should fetch external data
CacheStats afterQueryCacheStats = getCacheStats("delta");
assertGreaterThanOrEqual(afterQueryCacheStats.cacheSpaceUsed(), beforeCacheStats.cacheSpaceUsed() + tableSize);
assertGreaterThan(afterQueryCacheStats.externalReads(), beforeCacheStats.externalReads());
assertGreaterThanOrEqual(afterQueryCacheStats.cacheReads(), beforeCacheStats.cacheReads());
});

assertThat(onTrino().executeQuery("SELECT * FROM " + cachedTableName)).hasAnyRows();
assertEventually(
new Duration(10, SECONDS),
() -> {
CacheStats beforeQueryCacheStats = getCacheStats("delta");

// query via caching catalog should read exclusively from cache
CacheStats afterQueryCacheStats = getCacheStats("delta");
assertGreaterThan(afterQueryCacheStats.cacheReads(), beforeQueryCacheStats.cacheReads());
assertThat(afterQueryCacheStats.externalReads()).isEqualTo(beforeQueryCacheStats.externalReads());
assertThat(afterQueryCacheStats.cacheSpaceUsed()).isEqualTo(beforeQueryCacheStats.cacheSpaceUsed());
});
assertThat(onTrino().executeQuery("SELECT * FROM " + cachedTableName)).hasAnyRows();

onTrino().executeQuery("DROP TABLE " + nonCachedTableName);
// query via caching catalog should read exclusively from cache
CacheStats afterQueryCacheStats = getCacheStats("delta");
assertGreaterThan(afterQueryCacheStats.cacheReads(), beforeQueryCacheStats.cacheReads());
assertThat(afterQueryCacheStats.externalReads()).isEqualTo(beforeQueryCacheStats.externalReads());
assertThat(afterQueryCacheStats.cacheSpaceUsed()).isEqualTo(beforeQueryCacheStats.cacheSpaceUsed());
});
}
finally {
onTrino().executeQuery("DROP TABLE IF EXISTS " + nonCachedTableName);
onTrino().executeQuery("DROP SCHEMA IF EXISTS " + cachedSchemaName);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ public void testJmxTablesExposedByDeltaLakeConnectorBackedByGlueMetastore()
public void testJmxTablesExposedByDeltaLakeConnectorBackedByThriftMetastore()
{
assertThat(onTrino().executeQuery("SHOW TABLES IN jmx.current LIKE '%name=delta%'")).containsOnly(
row("io.trino.hdfs:name=delta,type=trinofilesystemcachestats"),
row("io.trino.hdfs:name=delta,type=trinohdfsfilesystemstats"),
row("io.trino.plugin.hive.metastore.cache:name=delta,type=cachinghivemetastore"),
row("io.trino.plugin.hive.metastore.thrift:name=delta,type=thrifthivemetastore"),
row("io.trino.plugin.hive:catalog=delta,name=delta,type=fileformatdatasourcestats"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.tests.product.deltalake;

import io.trino.tempto.BeforeMethodWithContext;
import io.trino.tempto.ProductTest;
import io.trino.tests.product.hudi.TestHudiHiveViewsCompatibility;
import io.trino.tests.product.iceberg.TestIcebergHiveViewsCompatibility;
Expand All @@ -24,6 +25,7 @@
import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand All @@ -34,12 +36,20 @@
public class TestHiveAndDeltaLakeCompatibility
extends ProductTest
{
private String bucketName;

@BeforeMethodWithContext
public void setUp()
{
bucketName = requireNonNull(System.getenv("S3_BUCKET"), "Environment variable not set: S3_BUCKET");
}

@Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
public void testInformationSchemaColumnsOnPresenceOfHiveView()
{
// use dedicated schema so we control the number and shape of tables
String schemaName = "test_redirect_to_delta_information_schema_columns_schema_" + randomNameSuffix();
onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS hive." + schemaName);
onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS hive." + schemaName + " WITH (location = 's3://" + bucketName + "/test_redirect_to_delta')");

String hiveViewName = "delta_schema_columns_hive_view_" + randomNameSuffix();
String hiveViewQualifiedName = format("hive.%s.%s", schemaName, hiveViewName);
Expand All @@ -52,21 +62,29 @@ public void testInformationSchemaColumnsOnPresenceOfHiveView()
}
finally {
onTrino().executeQuery("DROP VIEW IF EXISTS " + hiveViewQualifiedName);
onTrino().executeQuery("DROP SCHEMA " + schemaName);
onTrino().executeQuery("DROP SCHEMA IF EXISTS " + schemaName);
}
}

@Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
public void testUnregisterNotDeltaLakeTable()
{
String schemaName = "test_unregister_not_delta_lake_schema_" + randomNameSuffix();

String baseTableName = "test_unregister_not_delta_table_" + randomNameSuffix();
String hiveTableName = "hive.default." + baseTableName;
String hiveTableName = format("hive.%s.%s", schemaName, baseTableName);

onTrino().executeQuery("CREATE TABLE " + hiveTableName + " AS SELECT 1 a");
try {
onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS hive." + schemaName + " WITH (location = 's3://" + bucketName + "/test_unregister_to_delta')");

assertThatThrownBy(() -> onTrino().executeQuery("CALL delta.system.unregister_table('default', '" + baseTableName + "')"))
.hasMessageContaining("not a Delta Lake table");
onTrino().executeQuery("CREATE TABLE " + hiveTableName + " AS SELECT 1 a");

onTrino().executeQuery("DROP TABLE " + hiveTableName);
assertThatThrownBy(() -> onTrino().executeQuery(format("CALL delta.system.unregister_table('%s', '%s')", schemaName, baseTableName)))
.hasMessageContaining("not a Delta Lake table");
}
finally {
onTrino().executeQuery("DROP TABLE IF EXISTS " + hiveTableName);
onTrino().executeQuery("DROP SCHEMA IF EXISTS " + schemaName);
}
}
}