Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion docs/src/main/sphinx/connector/metastores.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,25 @@ properties:
- Nessie API endpoint URI (required).
Example: ``https://localhost:19120/api/v1``
* - ``iceberg.nessie-catalog.ref``
- The branch/tag to use for Nessie, defaults to ``main``.
- The branch/tag to use for Nessie. Defaults to ``main``.
* - ``iceberg.nessie-catalog.default-warehouse-dir``
- Default warehouse directory for schemas created without an explicit
``location`` property. Example: ``/tmp``
* - ``iceberg.nessie-catalog.read-timeout``
- The read timeout :ref:`duration <prop-type-duration>` for requests
to the Nessie server. Defaults to ``25s``.
* - ``iceberg.nessie-catalog.connection-timeout``
- The connection timeout :ref:`duration <prop-type-duration>` for
connection requests to the Nessie server. Defaults to ``5s``.
* - ``iceberg.nessie-catalog.enable-compression``
- Configure whether compression should be enabled or not for
requests to the Nessie server. Defaults to ``true``.
* - ``iceberg.nessie-catalog.authentication.type``
- The authentication type to use.
Available value is ``BEARER``. Defaults to no authentication.
* - ``iceberg.nessie-catalog.authentication.token``
- The token to use with ``BEARER`` authentication.
Example: ``SXVLUXUhIExFQ0tFUiEK``
```

```text
Expand Down
25 changes: 25 additions & 0 deletions plugin/trino-iceberg/pom.xml
Comment thread
ajantha-bhat marked this conversation as resolved.
Outdated
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
TODO (https://github.com/trinodb/trino/issues/11294) remove when we upgrade to surefire with https://issues.apache.org/jira/browse/SUREFIRE-1967
-->
<air.test.parallel>instances</air.test.parallel>
<dep.keycloak.version>21.1.2</dep.keycloak.version>
<!-- Nessie version (matching to Iceberg release) must be bumped along with Iceberg version bump to avoid compatibility issues -->
<dep.nessie.version>0.71.0</dep.nessie.version>
</properties>
Expand Down Expand Up @@ -577,6 +578,30 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.keycloak</groupId>
<artifactId>keycloak-admin-client-jakarta</artifactId>
<version>${dep.keycloak.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-jaxb-provider</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.spec.javax.ws.rs</groupId>
<artifactId>jboss-jaxrs-api_3.0_spec</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.keycloak</groupId>
<artifactId>keycloak-core</artifactId>
<version>${dep.keycloak.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
Expand Down
Comment thread
ajantha-bhat marked this conversation as resolved.
Outdated
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,37 @@

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.ConfigSecuritySensitive;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

import java.net.URI;
import java.util.Optional;

import static io.trino.plugin.iceberg.catalog.nessie.IcebergNessieCatalogConfig.Security.BEARER;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.function.Predicate.isEqual;
import static org.projectnessie.client.NessieConfigConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS;
import static org.projectnessie.client.NessieConfigConstants.DEFAULT_READ_TIMEOUT_MILLIS;

public class IcebergNessieCatalogConfig
{
public enum Security
{
BEARER,
}

private String defaultReferenceName = "main";
private String defaultWarehouseDir;
private URI serverUri;
private Duration readTimeout = new Duration(DEFAULT_READ_TIMEOUT_MILLIS, MILLISECONDS);
private Duration connectionTimeout = new Duration(DEFAULT_CONNECT_TIMEOUT_MILLIS, MILLISECONDS);
private boolean enableCompression = true;
private Security security;
private Optional<String> bearerToken = Optional.empty();

@NotNull
public String getDefaultReferenceName()
Expand Down Expand Up @@ -67,4 +88,84 @@ public IcebergNessieCatalogConfig setDefaultWarehouseDir(String defaultWarehouse
this.defaultWarehouseDir = defaultWarehouseDir;
return this;
}

@MinDuration("1ms")
public Duration getReadTimeout()
{
return readTimeout;
}

@Config("iceberg.nessie-catalog.read-timeout")
Comment thread
ajantha-bhat marked this conversation as resolved.
Outdated
@ConfigDescription("The read timeout for the client.")
public IcebergNessieCatalogConfig setReadTimeout(Duration readTimeout)
{
this.readTimeout = readTimeout;
return this;
}

@MinDuration("1ms")
public Duration getConnectionTimeout()
{
return connectionTimeout;
}

@Config("iceberg.nessie-catalog.connection-timeout")
@ConfigDescription("The connection timeout for the client.")
public IcebergNessieCatalogConfig setConnectionTimeout(Duration connectionTimeout)
{
this.connectionTimeout = connectionTimeout;
return this;
}

public boolean isCompressionEnabled()
{
return enableCompression;
}

@Config("iceberg.nessie-catalog.enable-compression")
@ConfigDescription("Configure whether compression should be enabled or not.")
public IcebergNessieCatalogConfig setCompressionEnabled(boolean enableCompression)
{
this.enableCompression = enableCompression;
return this;
}

public Optional<Security> getSecurity()
{
return Optional.ofNullable(security);
}

@Config("iceberg.nessie-catalog.authentication.type")
@ConfigDescription("The authentication type to use")
public IcebergNessieCatalogConfig setSecurity(Security security)
{
this.security = security;
return this;
}

public Optional<String> getBearerToken()
{
return bearerToken;
}

@Config("iceberg.nessie-catalog.authentication.token")
@ConfigDescription("The token to use with BEARER authentication")
@ConfigSecuritySensitive
public IcebergNessieCatalogConfig setBearerToken(String token)
{
this.bearerToken = Optional.ofNullable(token);
return this;
}

@AssertTrue(message = "'iceberg.nessie-catalog.authentication.token' must be configured only with 'iceberg.nessie-catalog.authentication.type' BEARER")
public boolean isTokenConfiguredWithoutType()
{
return getSecurity().filter(isEqual(BEARER)).isPresent() || getBearerToken().isEmpty();
}

@AssertTrue(message = "'iceberg.nessie-catalog.authentication.token' must be configured with 'iceberg.nessie-catalog.authentication.type' BEARER")
public boolean isMissingTokenForBearerAuth()
{
return getSecurity().filter(isEqual(BEARER)).isEmpty() || getBearerToken().isPresent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import org.apache.iceberg.nessie.NessieIcebergClient;
import org.projectnessie.client.api.NessieApiV1;
import org.projectnessie.client.auth.BearerAuthenticationProvider;
import org.projectnessie.client.http.HttpClientBuilder;

import static io.airlift.configuration.ConfigBinder.configBinder;
import static java.lang.Math.toIntExact;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

public class IcebergNessieCatalogModule
Expand All @@ -45,10 +47,16 @@ protected void setup(Binder binder)
@Singleton
public static NessieIcebergClient createNessieIcebergClient(IcebergNessieCatalogConfig icebergNessieCatalogConfig)
{
return new NessieIcebergClient(
HttpClientBuilder.builder()
.withUri(icebergNessieCatalogConfig.getServerUri())
.build(NessieApiV1.class),
HttpClientBuilder builder = HttpClientBuilder.builder()
.withUri(icebergNessieCatalogConfig.getServerUri())
.withDisableCompression(!icebergNessieCatalogConfig.isCompressionEnabled())
.withReadTimeout(toIntExact(icebergNessieCatalogConfig.getReadTimeout().toMillis()))
.withConnectionTimeout(toIntExact(icebergNessieCatalogConfig.getConnectionTimeout().toMillis()));

icebergNessieCatalogConfig.getBearerToken()
.ifPresent(token -> builder.withAuthentication(BearerAuthenticationProvider.create(token)));

return new NessieIcebergClient(builder.build(NessieApiV1.class),
icebergNessieCatalogConfig.getDefaultReferenceName(),
null,
ImmutableMap.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,59 @@ public void testNessieCatalog()
.shutdown();
}

@Test
Comment thread
ajantha-bhat marked this conversation as resolved.
Outdated
public void testNessieCatalogWithBearerAuth()
{
ConnectorFactory factory = getConnectorFactory();

factory.create(
"test",
Map.of(
"iceberg.catalog.type", "nessie",
"iceberg.nessie-catalog.default-warehouse-dir", "/tmp",
"iceberg.nessie-catalog.uri", "http://foo:1234",
"iceberg.nessie-catalog.authentication.type", "BEARER",
"iceberg.nessie-catalog.authentication.token", "someToken"),
new TestingConnectorContext())
.shutdown();
}

@Test
public void testNessieCatalogWithNoAuthAndAccessToken()
{
ConnectorFactory factory = getConnectorFactory();

assertThatThrownBy(() -> factory.create(
"test",
Map.of(
"iceberg.catalog.type", "nessie",
"iceberg.nessie-catalog.uri", "nessieUri",
"iceberg.nessie-catalog.default-warehouse-dir", "/tmp",
"iceberg.nessie-catalog.authentication.token", "someToken"),
new TestingConnectorContext())
.shutdown())
.isInstanceOf(ApplicationConfigurationException.class)
.hasMessageContaining("'iceberg.nessie-catalog.authentication.token' must be configured only with 'iceberg.nessie-catalog.authentication.type' BEARER");
}

@Test
public void testNessieCatalogWithNoAccessToken()
{
ConnectorFactory factory = getConnectorFactory();

assertThatThrownBy(() -> factory.create(
"test",
Map.of(
"iceberg.catalog.type", "nessie",
"iceberg.nessie-catalog.uri", "nessieUri",
"iceberg.nessie-catalog.default-warehouse-dir", "/tmp",
"iceberg.nessie-catalog.authentication.type", "BEARER"),
new TestingConnectorContext())
.shutdown())
.isInstanceOf(ApplicationConfigurationException.class)
.hasMessageContaining("'iceberg.nessie-catalog.authentication.token' must be configured with 'iceberg.nessie-catalog.authentication.type' BEARER");
}

private static ConnectorFactory getConnectorFactory()
{
return getOnlyElement(new IcebergPlugin().getConnectorFactories());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@
package io.trino.plugin.iceberg.catalog.nessie;

import com.google.common.collect.ImmutableMap;
import io.airlift.units.Duration;
import org.junit.jupiter.api.Test;

import java.net.URI;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.projectnessie.client.NessieConfigConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS;
import static org.projectnessie.client.NessieConfigConstants.DEFAULT_READ_TIMEOUT_MILLIS;

public class TestIcebergNessieCatalogConfig
{
Expand All @@ -31,7 +36,12 @@ public void testDefaults()
assertRecordedDefaults(recordDefaults(IcebergNessieCatalogConfig.class)
.setDefaultWarehouseDir(null)
.setServerUri(null)
.setDefaultReferenceName("main"));
.setDefaultReferenceName("main")
.setCompressionEnabled(true)
.setConnectionTimeout(new Duration(DEFAULT_CONNECT_TIMEOUT_MILLIS, MILLISECONDS))
.setReadTimeout(new Duration(DEFAULT_READ_TIMEOUT_MILLIS, MILLISECONDS))
.setSecurity(null)
.setBearerToken(null));
}

@Test
Expand All @@ -41,12 +51,22 @@ public void testExplicitPropertyMapping()
.put("iceberg.nessie-catalog.default-warehouse-dir", "/tmp")
.put("iceberg.nessie-catalog.uri", "http://localhost:xxx/api/v1")
.put("iceberg.nessie-catalog.ref", "someRef")
.put("iceberg.nessie-catalog.enable-compression", "false")
.put("iceberg.nessie-catalog.connection-timeout", "2s")
.put("iceberg.nessie-catalog.read-timeout", "5m")
.put("iceberg.nessie-catalog.authentication.type", "BEARER")
.put("iceberg.nessie-catalog.authentication.token", "bearerToken")
.buildOrThrow();

IcebergNessieCatalogConfig expected = new IcebergNessieCatalogConfig()
.setDefaultWarehouseDir("/tmp")
.setServerUri(URI.create("http://localhost:xxx/api/v1"))
.setDefaultReferenceName("someRef");
.setDefaultReferenceName("someRef")
.setCompressionEnabled(false)
.setConnectionTimeout(new Duration(2, TimeUnit.SECONDS))
.setReadTimeout(new Duration(5, TimeUnit.MINUTES))
.setSecurity(IcebergNessieCatalogConfig.Security.BEARER)
.setBearerToken("bearerToken");

assertFullMapping(properties, expected);
}
Expand Down
Loading