Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 13 additions & 19 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,6 @@
<artifactId>trino-plugin-toolkit</artifactId>
</dependency>

<!-- TODO: move to runtime scope and remove dependency plugin exclusion below -->
<dependency>
<groupId>io.trino.hadoop</groupId>
<artifactId>hadoop-apache</artifactId>
</dependency>

<dependency>
<groupId>io.trino.hive</groupId>
<artifactId>hive-apache</artifactId>
Expand Down Expand Up @@ -175,6 +169,11 @@
<artifactId>jjwt-jackson</artifactId>
</dependency>

<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>

<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
Expand Down Expand Up @@ -249,6 +248,12 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.trino.hadoop</groupId>
<artifactId>hadoop-apache</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>concurrent</artifactId>
Expand All @@ -261,7 +266,6 @@
<scope>runtime</scope>
</dependency>

<!-- used by tests but also needed transitively -->
<dependency>
<groupId>io.airlift</groupId>
<artifactId>node</artifactId>
Expand All @@ -283,14 +287,14 @@
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.1</version>
<version>5.2.1</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5</artifactId>
<version>5.1.1</version>
<version>5.2.1</version>
<scope>runtime</scope>
</dependency>

Expand Down Expand Up @@ -538,16 +542,6 @@
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<ignoredNonTestScopedDependencies>
<ignoredDependency>io.trino.hadoop:hadoop-apache</ignoredDependency>
</ignoredNonTestScopedDependencies>
</configuration>
</plugin>

<plugin>
<groupId>org.basepom.maven</groupId>
<artifactId>duplicate-finder-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,6 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
closer.register(fileScanTaskIterable);
this.fileScanTaskIterator = fileScanTaskIterable.iterator();
closer.register(fileScanTaskIterator);
// TODO: Remove when NPE check has been released: https://github.com/trinodb/trino/issues/15372
isFinished();
}

TupleDomain<IcebergColumnHandle> dynamicFilterPredicate = dynamicFilter.getCurrentPredicate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.jdbc.JdbcClientPool;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.PreDestroy;
import javax.inject.Inject;

import java.util.Optional;
import java.util.Map;

import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.CatalogProperties.URI;
Expand All @@ -43,14 +45,10 @@ public class TrinoJdbcCatalogFactory
private final TrinoFileSystemFactory fileSystemFactory;
private final IcebergJdbcClient jdbcClient;
private final String jdbcCatalogName;
private final String connectionUrl;
private final Optional<String> connectionUser;
private final Optional<String> connectionPassword;
private final String defaultWarehouseDir;
private final boolean isUniqueTableLocation;

@GuardedBy("this")
private JdbcCatalog icebergCatalog;
private final Map<String, String> catalogProperties;
private final JdbcClientPool clientPool;

@Inject
public TrinoJdbcCatalogFactory(
Expand All @@ -69,39 +67,42 @@ public TrinoJdbcCatalogFactory(
this.isUniqueTableLocation = requireNonNull(icebergConfig, "icebergConfig is null").isUniqueTableLocation();
this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null");
this.jdbcCatalogName = jdbcConfig.getCatalogName();
this.connectionUrl = jdbcConfig.getConnectionUrl();
this.connectionUser = jdbcConfig.getConnectionUser();
this.connectionPassword = jdbcConfig.getConnectionPassword();
this.defaultWarehouseDir = jdbcConfig.getDefaultWarehouseDir();

ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
properties.put(URI, jdbcConfig.getConnectionUrl());
properties.put(WAREHOUSE_LOCATION, defaultWarehouseDir);
jdbcConfig.getConnectionUser().ifPresent(user -> properties.put(PROPERTY_PREFIX + "user", user));
jdbcConfig.getConnectionPassword().ifPresent(password -> properties.put(PROPERTY_PREFIX + "password", password));
this.catalogProperties = properties.buildOrThrow();

this.clientPool = new JdbcClientPool(jdbcConfig.getConnectionUrl(), catalogProperties);
}

@PreDestroy
public void shutdown()
{
clientPool.close();
}

@Override
public synchronized TrinoCatalog create(ConnectorIdentity identity)
public TrinoCatalog create(ConnectorIdentity identity)
{
// Reuse JdbcCatalog instance to avoid JDBC connection leaks
if (icebergCatalog == null) {
icebergCatalog = createJdbcCatalog();
}
JdbcCatalog jdbcCatalog = new JdbcCatalog(
config -> new ForwardingFileIo(fileSystemFactory.create(identity)),
config -> clientPool,
false);

jdbcCatalog.initialize(jdbcCatalogName, catalogProperties);

return new TrinoJdbcCatalog(
catalogName,
typeManager,
tableOperationsProvider,
icebergCatalog,
jdbcCatalog,
jdbcClient,
fileSystemFactory,
isUniqueTableLocation,
defaultWarehouseDir);
}

private JdbcCatalog createJdbcCatalog()
{
JdbcCatalog jdbcCatalog = new JdbcCatalog();
ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
properties.put(URI, connectionUrl);
properties.put(WAREHOUSE_LOCATION, defaultWarehouseDir);
connectionUser.ifPresent(user -> properties.put(PROPERTY_PREFIX + "user", user));
connectionPassword.ifPresent(password -> properties.put(PROPERTY_PREFIX + "password", password));
jdbcCatalog.initialize(jdbcCatalogName, properties.buildOrThrow());
return jdbcCatalog;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@
package io.trino.plugin.iceberg.catalog.rest;

import com.google.common.collect.ImmutableMap;
import io.trino.hdfs.ConfigurationUtils;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.iceberg.IcebergConfig;
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogConfig.SessionType;
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
import io.trino.spi.security.ConnectorIdentity;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTSessionCatalog;

import javax.annotation.concurrent.GuardedBy;
Expand All @@ -36,6 +38,7 @@
public class TrinoIcebergRestCatalogFactory
implements TrinoCatalogFactory
{
private final TrinoFileSystemFactory fileSystemFactory;
private final CatalogName catalogName;
private final String trinoVersion;
private final URI serverUri;
Expand All @@ -49,12 +52,14 @@ public class TrinoIcebergRestCatalogFactory

@Inject
public TrinoIcebergRestCatalogFactory(
TrinoFileSystemFactory fileSystemFactory,
CatalogName catalogName,
IcebergRestCatalogConfig restConfig,
SecurityProperties securityProperties,
IcebergConfig icebergConfig,
NodeVersion nodeVersion)
{
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString();
requireNonNull(restConfig, "restConfig is null");
Expand All @@ -77,8 +82,14 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity)
warehouse.ifPresent(location -> properties.put(CatalogProperties.WAREHOUSE_LOCATION, location));
properties.put("trino-version", trinoVersion);
properties.putAll(securityProperties.get());
RESTSessionCatalog icebergCatalogInstance = new RESTSessionCatalog();
icebergCatalogInstance.setConf(ConfigurationUtils.getInitialConfiguration());
RESTSessionCatalog icebergCatalogInstance = new RESTSessionCatalog(
config -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build(),
(context, config) -> {
ConnectorIdentity currentIdentity = (context.wrappedIdentity() != null)
? ((ConnectorIdentity) context.wrappedIdentity())
: ConnectorIdentity.ofUser("fake");
return new ForwardingFileIo(fileSystemFactory.create(currentIdentity));
});
icebergCatalogInstance.initialize(catalogName.toString(), properties.buildOrThrow());

icebergCatalog = icebergCatalogInstance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ public void updateViewColumnComment(ConnectorSession session, SchemaTableName sc
private SessionCatalog.SessionContext convert(ConnectorSession session)
{
return switch (sessionType) {
case NONE -> SessionCatalog.SessionContext.createEmpty();
case NONE -> new SessionContext(randomUUID().toString(), null, null, ImmutableMap.of(), session.getIdentity());
case USER -> {
String sessionId = format("%s-%s", session.getUser(), session.getSource().orElse("default"));

Expand All @@ -429,7 +429,7 @@ private SessionCatalog.SessionContext convert(ConnectorSession session)
.put(OAuth2Properties.JWT_TOKEN_TYPE, subjectJwt)
.buildOrThrow();

yield new SessionCatalog.SessionContext(sessionId, session.getUser(), credentials, properties);
yield new SessionCatalog.SessionContext(sessionId, session.getUser(), credentials, properties, session.getIdentity());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5591,7 +5591,7 @@ public void testExpireSnapshots()
.matches("VALUES (BIGINT '3', VARCHAR 'one two')");
List<String> updatedFiles = getAllMetadataFilesFromTableDirectory(tableLocation);
List<Long> updatedSnapshots = getSnapshotIds(tableName);
assertThat(updatedFiles.size()).isEqualTo(initialFiles.size() - 1);
assertThat(updatedFiles.size()).isEqualTo(initialFiles.size() - 2);
assertThat(updatedSnapshots.size()).isLessThan(initialSnapshots.size());
assertThat(updatedSnapshots.size()).isEqualTo(1);
assertThat(initialSnapshots).containsAll(updatedSnapshots);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void testExpireSnapshotsBatchDeletes()

assertThat(query("SELECT * FROM " + tableName))
.matches("VALUES (VARCHAR 'one', 1), (VARCHAR 'two', 2)");
assertThat(events).hasSize(2);
assertThat(events).hasSize(3);
// if files were deleted in batch there should be only one request id because there was one request only
assertThat(events.stream()
.map(event -> event.responseElements().get("x-amz-request-id"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class DataFileRecord
private final Map<Integer, String> lowerBounds;
private final Map<Integer, String> upperBounds;

@SuppressWarnings("unchecked")
public static DataFileRecord toDataFileRecord(MaterializedRow row)
{
assertEquals(row.getFieldCount(), 14);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ public void testCreateTableAsSelect()
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(MANIFEST, OUTPUT_FILE_CREATE_OR_OVERWRITE), 1)
.addCopies(new FileOperation(MANIFEST, OUTPUT_FILE_LOCATION), 1)
.addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 2)
.addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(METADATA_JSON, OUTPUT_FILE_CREATE), 2) // TODO (https://github.com/trinodb/trino/issues/15439): it would be good to publish data and stats in one commit
.addCopies(new FileOperation(METADATA_JSON, OUTPUT_FILE_LOCATION), 2)
.addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 2)
.addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 2)
.addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1)
.addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1)
.addCopies(new FileOperation(SNAPSHOT, OUTPUT_FILE_CREATE_OR_OVERWRITE), 1)
.addCopies(new FileOperation(SNAPSHOT, OUTPUT_FILE_LOCATION), 2)
.addCopies(new FileOperation(STATS, OUTPUT_FILE_CREATE), 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void testCreateTableAsSelect()
ImmutableMultiset.builder()
.add(GET_DATABASE)
.add(CREATE_TABLE)
.addCopies(GET_TABLE, 5)
.addCopies(GET_TABLE, 4)
.add(REPLACE_TABLE)
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static Catalog backendCatalog(File warehouseLocation)
HdfsContext context = new HdfsContext(connectorSession);

JdbcCatalog catalog = new JdbcCatalog();
catalog.setConf(hdfsEnvironment.getConfiguration(context, new Path(warehouseLocation.getAbsolutePath())));
catalog.setConf((Object) hdfsEnvironment.getConfiguration(context, new Path(warehouseLocation.getAbsolutePath())));
catalog.initialize("backend_jdbc", properties.buildOrThrow());

return catalog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private DelegatingRestSessionCatalog() {}

DelegatingRestSessionCatalog(RESTCatalogAdapter adapter, Catalog delegate)
{
super(properties -> adapter);
super(properties -> adapter, null);
this.adapter = requireNonNull(adapter, "adapter is null");
this.delegate = requireNonNull(delegate, "delegate catalog is null");
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
<dep.kafka-clients.version>3.3.2</dep.kafka-clients.version>
<dep.casandra.version>4.14.0</dep.casandra.version>
<dep.minio.version>8.4.5</dep.minio.version>
<dep.iceberg.version>1.1.0</dep.iceberg.version>
<dep.iceberg.version>1.2.1</dep.iceberg.version>
<dep.protobuf.version>3.22.2</dep.protobuf.version>
<dep.wire.version>4.5.0</dep.wire.version>
<dep.netty.version>4.1.79.Final</dep.netty.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class EnvSinglenodeSparkIcebergRest
private static final int REST_SERVER_PORT = 8181;
private static final String SPARK_CONTAINER_NAME = "spark";
private static final String REST_CONTAINER_NAME = "iceberg-with-rest";
private static final String REST_SERVER_IMAGE = "tabulario/iceberg-rest:0.3.0";
private static final String REST_SERVER_IMAGE = "tabulario/iceberg-rest:0.4.0";
private static final String CATALOG_WAREHOUSE = "hdfs://hadoop-master:9000/user/hive/warehouse";

private final DockerFiles dockerFiles;
Expand Down