Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.hdfs;

import org.apache.hadoop.conf.Configuration;

import javax.inject.Inject;

import java.net.URI;

public class HdfsConfigurationProvider
implements DynamicConfigurationProvider
{
private final boolean cachingDisabled;

@Inject
public HdfsConfigurationProvider(HdfsConfig hdfsCOnfig)
{
this.cachingDisabled = hdfsCOnfig.getFileSystemMaxCacheSize() == 0;
}

@Override
public void updateConfiguration(Configuration configuration, HdfsContext context, URI uri)
{
if (cachingDisabled) {
configuration.setBoolean("fs." + uri.getScheme() + ".impl.disable.cache", true);
}
}
}
3 changes: 2 additions & 1 deletion lib/trino-hdfs/src/main/java/io/trino/hdfs/HdfsModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public void configure(Binder binder)

binder.bind(HdfsConfigurationInitializer.class).in(Scopes.SINGLETON);
newSetBinder(binder, ConfigurationInitializer.class);
newSetBinder(binder, DynamicConfigurationProvider.class);
newSetBinder(binder, DynamicConfigurationProvider.class).addBinding()
.to(HdfsConfigurationProvider.class).in(Scopes.SINGLETON);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.hdfs.HdfsConfig;
import io.trino.hdfs.HdfsConfiguration;
import io.trino.hdfs.HdfsConfigurationInitializer;
import io.trino.hdfs.HdfsConfigurationProvider;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.hdfs.authentication.NoHdfsAuthentication;
Expand Down Expand Up @@ -172,7 +173,7 @@ public class FileHiveMetastore
public static FileHiveMetastore createTestingFileHiveMetastore(File catalogDirectory)
{
HdfsConfig hdfsConfig = new HdfsConfig();
HdfsConfiguration hdfsConfiguration = new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of());
HdfsConfiguration hdfsConfiguration = new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of(new HdfsConfigurationProvider(hdfsConfig)));
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication());
return new FileHiveMetastore(
new NodeVersion("testversion"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import io.trino.hdfs.HdfsConfig;
import io.trino.hdfs.HdfsConfiguration;
import io.trino.hdfs.HdfsConfigurationInitializer;
import io.trino.hdfs.HdfsConfigurationProvider;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.hdfs.authentication.NoHdfsAuthentication;
Expand Down Expand Up @@ -220,7 +221,7 @@ public GlueHiveMetastore(
public static GlueHiveMetastore createTestingGlueHiveMetastore(String defaultWarehouseDir)
{
HdfsConfig hdfsConfig = new HdfsConfig();
HdfsConfiguration hdfsConfiguration = new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of());
HdfsConfiguration hdfsConfiguration = new DynamicHdfsConfiguration(new HdfsConfigurationInitializer(hdfsConfig), ImmutableSet.of(new HdfsConfigurationProvider(hdfsConfig)));
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication());
GlueMetastoreStats stats = new GlueMetastoreStats();
GlueHiveMetastoreConfig glueConfig = new GlueHiveMetastoreConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.iceberg.catalog.rest;

import com.google.common.collect.ImmutableMap;
import io.trino.hdfs.ConfigurationInitializer;
import io.trino.hdfs.ConfigurationUtils;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.hive.NodeVersion;
Expand All @@ -22,6 +23,7 @@
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogConfig.SessionType;
import io.trino.spi.security.ConnectorIdentity;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.rest.RESTSessionCatalog;

Expand All @@ -30,6 +32,7 @@

import java.net.URI;
import java.util.Optional;
import java.util.Set;

import static java.util.Objects.requireNonNull;

Expand All @@ -43,6 +46,7 @@ public class TrinoIcebergRestCatalogFactory
private final SessionType sessionType;
private final SecurityProperties securityProperties;
private final boolean uniqueTableLocation;
private final Set<ConfigurationInitializer> configurationInitializers;

@GuardedBy("this")
private RESTSessionCatalog icebergCatalog;
Expand All @@ -53,7 +57,8 @@ public TrinoIcebergRestCatalogFactory(
IcebergRestCatalogConfig restConfig,
SecurityProperties securityProperties,
IcebergConfig icebergConfig,
NodeVersion nodeVersion)
NodeVersion nodeVersion,
Set<ConfigurationInitializer> configurationInitializers)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString();
Expand All @@ -64,6 +69,7 @@ public TrinoIcebergRestCatalogFactory(
this.securityProperties = requireNonNull(securityProperties, "securityProperties is null");
requireNonNull(icebergConfig, "icebergConfig is null");
this.uniqueTableLocation = icebergConfig.isUniqueTableLocation();
this.configurationInitializers = requireNonNull(configurationInitializers, "configurationInitializers is null");
}

@Override
Expand All @@ -78,7 +84,9 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity)
properties.put("trino-version", trinoVersion);
properties.putAll(securityProperties.get());
RESTSessionCatalog icebergCatalogInstance = new RESTSessionCatalog();
icebergCatalogInstance.setConf(ConfigurationUtils.getInitialConfiguration());
Configuration configuration = ConfigurationUtils.getInitialConfiguration();
configurationInitializers.forEach(configurationInitializer -> configurationInitializer.initializeConfiguration(configuration));
icebergCatalogInstance.setConf(configuration);
icebergCatalogInstance.initialize(catalogName.toString(), properties.buildOrThrow());

icebergCatalog = icebergCatalogInstance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,63 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.http.server.testing.TestingHttpServer;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.hdfs.HdfsFileSystemFactory;
import io.trino.hdfs.ConfigurationInitializer;
import io.trino.hdfs.DynamicHdfsConfiguration;
import io.trino.hdfs.HdfsConfig;
import io.trino.hdfs.HdfsConfiguration;
import io.trino.hdfs.HdfsConfigurationInitializer;
import io.trino.hdfs.HdfsConfigurationProvider;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.hdfs.authentication.NoHdfsAuthentication;
import io.trino.plugin.hive.s3.HiveS3Config;
import io.trino.plugin.hive.s3.TrinoS3ConfigurationInitializer;
import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest;
import io.trino.plugin.iceberg.IcebergConfig;
import io.trino.plugin.iceberg.IcebergQueryRunner;
import io.trino.plugin.iceberg.SchemaInitializer;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorBehavior;
import io.trino.testing.containers.Minio;
import io.trino.tpch.TpchTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.rest.DelegatingRestSessionCatalog;
import org.assertj.core.util.Files;

import java.io.File;
import java.nio.file.Path;
import java.util.Optional;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.plugin.hive.containers.HiveMinioDataLake.MINIO_DEFAULT_REGION;
import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting;
import static io.trino.plugin.iceberg.catalog.rest.RestCatalogTestUtils.backendCatalog;
import static io.trino.testing.TestingConnectorSession.SESSION;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY;
import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY;
import static io.trino.tpch.TpchTable.LINE_ITEM;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestIcebergRestCatalogConnectorSmokeTest
extends BaseIcebergConnectorSmokeTest
{
private File warehouseLocation;
private final String bucketName = "iceberg-rest-smoke-test-" + randomNameSuffix();
private final String schemaName;

private Minio minio;
private TrinoFileSystemFactory fileSystemFactory;

public TestIcebergRestCatalogConnectorSmokeTest()
{
super(new IcebergConfig().getFileFormat().toIceberg());
this.schemaName = "tpch_" + format.name().toLowerCase(ENGLISH);
}

@SuppressWarnings("DuplicateBranchesInSwitch")
Expand All @@ -64,10 +90,28 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
protected QueryRunner createQueryRunner()
throws Exception
{
warehouseLocation = Files.newTemporaryFolder();
closeAfterClass(() -> deleteRecursively(warehouseLocation.toPath(), ALLOW_INSECURE));
this.minio = closeAfterClass(
Minio.builder()
.withEnvVars(ImmutableMap.<String, String>builder()
.put("MINIO_ACCESS_KEY", MINIO_ACCESS_KEY)
.put("MINIO_SECRET_KEY", MINIO_SECRET_KEY)
.put("MINIO_REGION", MINIO_DEFAULT_REGION)
.buildOrThrow())
.build());
this.minio.start();
this.minio.createBucket(bucketName);

ConfigurationInitializer s3Config = new TrinoS3ConfigurationInitializer(new HiveS3Config()
.setS3AwsAccessKey(MINIO_ACCESS_KEY)
.setS3AwsSecretKey(MINIO_SECRET_KEY)
.setS3Endpoint("http://" + minio.getMinioApiEndpoint())
.setS3PathStyleAccess(true));
HdfsConfig hdfsConfig = new HdfsConfig().setFileSystemMaxCacheSize(0);
HdfsConfigurationInitializer initializer = new HdfsConfigurationInitializer(hdfsConfig, ImmutableSet.of(s3Config));
HdfsConfiguration hdfsConfiguration = new DynamicHdfsConfiguration(initializer, ImmutableSet.of(new HdfsConfigurationProvider(hdfsConfig)));
this.fileSystemFactory = new HdfsFileSystemFactory(new HdfsEnvironment(hdfsConfiguration, hdfsConfig, new NoHdfsAuthentication()));

Catalog backend = backendCatalog(warehouseLocation);
Catalog backend = jdbcBackedCatalog(hdfsConfiguration);

DelegatingRestSessionCatalog delegatingCatalog = DelegatingRestSessionCatalog.builder()
.delegate(backend)
Expand All @@ -78,19 +122,31 @@ protected QueryRunner createQueryRunner()
closeAfterClass(testServer::stop);

return IcebergQueryRunner.builder()
.setBaseDataDir(Optional.of(warehouseLocation.toPath()))
.setIcebergProperties(
ImmutableMap.<String, String>builder()
// Disabling the fs cache is necessary to demonstrate that some FileSystems are not initialized properly
.put("hive.fs.cache.max-size", "0")
.put("iceberg.file-format", format.name())
.put("iceberg.catalog.type", "rest")
.put("iceberg.catalog.type", "REST")
.put("iceberg.rest-catalog.uri", testServer.getBaseUrl().toString())
.put("iceberg.rest-catalog.warehouse", "s3://" + bucketName)
.put("hive.s3.aws-access-key", MINIO_ACCESS_KEY)
.put("hive.s3.aws-secret-key", MINIO_SECRET_KEY)
.put("hive.s3.endpoint", "http://" + minio.getMinioApiEndpoint())
.put("hive.s3.path-style-access", "true")
.put("hive.s3.streaming.part-size", "5MB")
.put("iceberg.register-table-procedure.enabled", "true")
.put("iceberg.writer-sort-buffer-size", "1MB")
.buildOrThrow())
.setInitialTables(ImmutableList.<TpchTable<?>>builder()
.addAll(REQUIRED_TPCH_TABLES)
.add(LINE_ITEM)
.build())
.setSchemaInitializer(
SchemaInitializer.builder()
.withSchemaName(schemaName)
.withClonedTpchTables(ImmutableList.<TpchTable<?>>builder()
.addAll(REQUIRED_TPCH_TABLES)
.add(LINE_ITEM)
.build())
.withSchemaProperties(Map.of("location", "'s3://" + bucketName + "/" + schemaName + "'"))
.build())
.build();
}

Expand Down Expand Up @@ -131,13 +187,14 @@ protected String getMetadataLocation(String tableName)
@Override
protected String schemaPath()
{
return format("%s/%s", warehouseLocation, getSession().getSchema());
return format("s3://%s/%s", bucketName, schemaName);
}

@Override
protected boolean locationExists(String location)
{
return java.nio.file.Files.exists(Path.of(location));
String prefix = "s3://" + bucketName + "/";
return !minio.createMinioClient().listObjects(bucketName, location.substring(prefix.length())).isEmpty();
}

@Override
Expand Down Expand Up @@ -220,12 +277,30 @@ public void testRepeatUnregisterTable()
@Override
protected boolean isFileSorted(String path, String sortColumnName)
{
return checkOrcFileSorting(path, sortColumnName);
return checkOrcFileSorting(fileSystemFactory, path, sortColumnName);
}

@Override
protected void deleteDirectory(String location)
{
// used when unregistering a table, which is not supported by the REST catalog
}

private Catalog jdbcBackedCatalog(HdfsConfiguration hdfsConfiguration)
throws URISyntaxException
{
String warehouseLocation = "s3://" + bucketName;

ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
properties.put(CatalogProperties.URI, "jdbc:h2:file:" + Files.newTemporaryFile().getAbsolutePath());
properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user");
properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);

JdbcCatalog catalog = new JdbcCatalog();
catalog.setConf(hdfsConfiguration.getConfiguration(new HdfsContext(SESSION.getIdentity()), new URI(warehouseLocation)));
catalog.initialize("backend_jdbc", properties.buildOrThrow());

return catalog;
}
}