Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public static IcebergFileFormat fromIceberg(FileFormat format)
case AVRO -> AVRO;
// Not used as a data file format
case METADATA -> throw new IllegalArgumentException("Unexpected METADATA file format");
case PUFFIN -> throw new IllegalArgumentException("Unexpected PUFFIN file format");
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1474,7 +1474,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(
INCREMENTAL_UPDATE,
collectedStatistics);
transaction.updateStatistics()
.setStatistics(newSnapshotId, statisticsFile)
.setStatistics(statisticsFile)
.commit();

commitTransaction(transaction, "update statistics on insert");
Expand Down Expand Up @@ -1986,7 +1986,7 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle
StatisticsFile newStatsFile = tableStatisticsWriter.rewriteStatisticsFile(session, reloadedTable, newSnapshotId);

transaction.updateStatistics()
.setStatistics(newSnapshotId, newStatsFile)
.setStatistics(newStatsFile)
.commit();
commitTransaction(transaction, "update statistics after optimize");
}
Expand Down Expand Up @@ -2931,7 +2931,7 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
REPLACE,
collectedStatistics);
transaction.updateStatistics()
.setStatistics(snapshotId, statisticsFile)
.setStatistics(statisticsFile)
.commit();

commitTransaction(transaction, "statistics collection");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ public static Object getValue(JsonNode partitionValue, Type type)
createDecimalType(decimalType.precision(), decimalType.scale()));
// TODO https://github.com/trinodb/trino/issues/19753 Support Iceberg timestamp types with nanosecond precision
case TIMESTAMP_NANO:
// TODO https://github.com/trinodb/trino/issues/24538 Support variant type
case VARIANT:
case UNKNOWN:
case LIST:
case MAP:
case STRUCT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ public static Type toTrinoType(org.apache.iceberg.types.Type type, TypeManager t
return RowType.from(fields.stream()
.map(field -> new RowType.Field(Optional.of(field.name()), toTrinoType(field.type(), typeManager)))
.collect(toImmutableList()));
case VARIANT:
// TODO https://github.com/trinodb/trino/issues/24538 Support variant type
break;
case UNKNOWN:
break;
}
throw new UnsupportedOperationException(format("Cannot convert from Iceberg type '%s' (%s) to Trino type", type, type.typeId()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ private static String convertToTypeString(Type type)
case TIMESTAMP_NANO -> throw new TrinoException(NOT_SUPPORTED, "Unsupported Iceberg type: TIMESTAMP_NANO");
case FIXED, BINARY -> "binary";
case DECIMAL -> "decimal(%s,%s)".formatted(((DecimalType) type).precision(), ((DecimalType) type).scale());
case UNKNOWN -> throw new TrinoException(NOT_SUPPORTED, "Unsupported Iceberg type: UNKNOWN");
// TODO https://github.com/trinodb/trino/issues/24538 Support variant type
case VARIANT -> throw new TrinoException(NOT_SUPPORTED, "Unsupported Iceberg type: VARIANT");
case LIST -> "array<%s>".formatted(convert(type.asListType().elementType()));
case MAP -> "map<%s,%s>".formatted(convert(type.asMapType().keyType()), convert(type.asMapType().valueType()));
case STRUCT -> "struct<%s>".formatted(type.asStructType().fields().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ private static List<OrcType> toOrcType(int nextFieldTypeIndex, Type type, Map<St
.buildOrThrow();
yield ImmutableList.of(new OrcType(OrcTypeKind.BINARY, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes));
}
case VARIANT -> throw new TrinoException(NOT_SUPPORTED, "Unsupported Iceberg type: VARIANT");
case UNKNOWN -> throw new TrinoException(NOT_SUPPORTED, "Unsupported Iceberg type: UNKNOWN");
case STRUCT -> toOrcStructType(nextFieldTypeIndex, (StructType) type, attributes);
case LIST -> toOrcListType(nextFieldTypeIndex, (ListType) type, attributes);
case MAP -> toOrcMapType(nextFieldTypeIndex, (MapType) type, attributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void testAnalyzeWithSchemaEvolution()
double infoDataSize = (double) computeActual("SHOW STATS FOR " + tableName).getMaterializedRows().stream()
.filter(row -> "info".equals(row.getField(0)))
.collect(onlyElement()).getField(1);
assertThat(infoDataSize).isBetween(2000.0, 4000.0);
assertThat(infoDataSize).isBetween(2000.0, 5000.0);
assertQuery(
"SHOW STATS FOR " + tableName,
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

@Isolated // TODO remove
@TestInstance(PER_CLASS)
@Disabled("https://github.com/trinodb/trino/issues/24945 Reenable after Polaris supports Iceberg versions >= 1.8.0")
final class TestIcebergPolarisCatalogConnectorSmokeTest
extends BaseIcebergConnectorSmokeTest
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorBehavior;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.io.IOException;
Expand All @@ -34,6 +35,7 @@
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

@Disabled("https://github.com/trinodb/trino/issues/24945 Reenable after Unity supports Iceberg versions >= 1.8.0")
final class TestIcebergUnityRestCatalogConnectorSmokeTest
extends BaseIcebergConnectorSmokeTest
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ protected QueryRunner createQueryRunner()
throws Exception
{
Network network = Network.newNetwork();
minio = closeAfterClass(Minio.builder().withNetwork(network).build());
// Use new MinIO version because Iceberg enables strong integrity checks
minio = closeAfterClass(Minio.builder().withImage("minio/minio:RELEASE.2025-01-20T14-49-07Z").withNetwork(network).build());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will break iceberg in storage like ECS. Can we force iceberg not to use storng integrity checks?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minio.start();
minio.createBucket(bucketName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.iceberg.exceptions.RESTException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.io.CharStreams;
import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod;
import org.apache.iceberg.rest.HTTPRequest.HTTPMethod;
import org.apache.iceberg.rest.RESTCatalogAdapter.Route;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.util.Pair;
Expand Down Expand Up @@ -98,15 +98,19 @@ protected void execute(ServletRequestContext context, HttpServletResponse respon
return;
}

HTTPRequest request = restCatalogAdapter.buildRequest(
context.method(),
context.path(),
context.queryParams(),
context.headers(),
context.body());

try {
Object responseBody = restCatalogAdapter.execute(
context.method(),
context.path(),
context.queryParams(),
context.body(),
request,
context.route().responseClass(),
context.headers(),
handle(response));
handle(response),
_ -> {});

if (responseBody != null) {
RESTObjectMapper.mapper().writeValue(response.getWriter(), responseBody);
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@
<dep.aws-sdk.version>1.12.780</dep.aws-sdk.version>
<dep.cassandra.version>4.17.0</dep.cassandra.version>
<dep.confluent.version>7.7.1</dep.confluent.version>
<dep.docker.images.version>108</dep.docker.images.version>
<dep.docker.images.version>109</dep.docker.images.version>
<dep.drift.version>1.22</dep.drift.version>
<dep.flyway.version>11.3.1</dep.flyway.version>
<dep.frontend-maven-plugin.version>1.15.1</dep.frontend-maven-plugin.version>
Expand All @@ -198,7 +198,7 @@
<dep.gib.version>4.5.4</dep.gib.version>
<dep.google.http.client.version>1.46.1</dep.google.http.client.version>
<dep.httpcore5.version>5.3.3</dep.httpcore5.version>
<dep.iceberg.version>1.7.1</dep.iceberg.version>
<dep.iceberg.version>1.8.0</dep.iceberg.version>
<dep.jna.version>5.16.0</dep.jna.version>
<dep.joda.version>2.13.1</dep.joda.version>
<dep.jsonwebtoken.version>0.12.6</dep.jsonwebtoken.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.HADOOP;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.TESTS;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.configureTempto;
import static io.trino.tests.product.launcher.env.EnvironmentDefaults.HADOOP_BASE_IMAGE;
import static io.trino.tests.product.launcher.env.common.Hadoop.CONTAINER_HADOOP_INIT_D;
import static java.util.Objects.requireNonNull;
Expand All @@ -48,6 +49,7 @@ public class EnvSinglenodeSparkIceberg
private final DockerFiles dockerFiles;
private final PortBinder portBinder;
private final String hadoopImagesVersion;
private final DockerFiles.ResourceProvider configDir;

@Inject
public EnvSinglenodeSparkIceberg(Standard standard, Hadoop hadoop, DockerFiles dockerFiles, EnvironmentConfig config, PortBinder portBinder)
Expand All @@ -56,6 +58,7 @@ public EnvSinglenodeSparkIceberg(Standard standard, Hadoop hadoop, DockerFiles d
this.dockerFiles = requireNonNull(dockerFiles, "dockerFiles is null");
this.portBinder = requireNonNull(portBinder, "portBinder is null");
this.hadoopImagesVersion = config.getHadoopImagesVersion();
this.configDir = dockerFiles.getDockerFilesHostDirectory("conf/environment/singlenode-spark-iceberg");
}

@Override
Expand All @@ -76,6 +79,8 @@ public void extendEnvironment(Environment.Builder builder)
builder.configureContainer(TESTS, dockerContainer -> dockerContainer
// Binding instead of copying for avoiding OutOfMemoryError https://github.com/testcontainers/testcontainers-java/issues/2863
.withFileSystemBind(HIVE_JDBC_PROVIDER.getParent(), "/docker/jdbc", BindMode.READ_ONLY));

configureTempto(builder, configDir);
}

@SuppressWarnings("resource")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.HADOOP;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.TESTS;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.configureTempto;
import static io.trino.tests.product.launcher.env.EnvironmentDefaults.HADOOP_BASE_IMAGE;
import static io.trino.tests.product.launcher.env.common.Hadoop.CONTAINER_HADOOP_INIT_D;
import static java.util.Objects.requireNonNull;
Expand All @@ -50,6 +51,7 @@ public class EnvSinglenodeSparkIcebergJdbcCatalog
private final DockerFiles dockerFiles;
private final PortBinder portBinder;
private final String imagesVersion;
private final DockerFiles.ResourceProvider configDir;

@Inject
public EnvSinglenodeSparkIcebergJdbcCatalog(Standard standard, Hadoop hadoop, DockerFiles dockerFiles, EnvironmentConfig config, PortBinder portBinder)
Expand All @@ -58,6 +60,7 @@ public EnvSinglenodeSparkIcebergJdbcCatalog(Standard standard, Hadoop hadoop, Do
this.dockerFiles = requireNonNull(dockerFiles, "dockerFiles is null");
this.portBinder = requireNonNull(portBinder, "portBinder is null");
this.imagesVersion = requireNonNull(config, "config is null").getImagesVersion();
this.configDir = dockerFiles.getDockerFilesHostDirectory("conf/environment/singlenode-spark-iceberg-jdbc-catalog");
}

@Override
Expand All @@ -80,6 +83,8 @@ public void extendEnvironment(Environment.Builder builder)
builder.configureContainer(TESTS, dockerContainer -> dockerContainer
// Binding instead of copying for avoiding OutOfMemoryError https://github.com/testcontainers/testcontainers-java/issues/2863
.withFileSystemBind(HIVE_JDBC_PROVIDER.getParent(), "/docker/jdbc", BindMode.READ_ONLY));

configureTempto(builder, configDir);
}

@SuppressWarnings("resource")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.HADOOP;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.TESTS;
import static io.trino.tests.product.launcher.env.EnvironmentContainers.configureTempto;
import static java.util.Objects.requireNonNull;
import static org.testcontainers.utility.MountableFile.forHostPath;

Expand All @@ -49,6 +50,7 @@ public class EnvSinglenodeSparkIcebergNessie
private final DockerFiles dockerFiles;
private final PortBinder portBinder;
private final String hadoopImagesVersion;
private final DockerFiles.ResourceProvider configDir;

@Inject
public EnvSinglenodeSparkIcebergNessie(Standard standard, Hadoop hadoop, DockerFiles dockerFiles, EnvironmentConfig config, PortBinder portBinder)
Expand All @@ -57,6 +59,7 @@ public EnvSinglenodeSparkIcebergNessie(Standard standard, Hadoop hadoop, DockerF
this.dockerFiles = requireNonNull(dockerFiles, "dockerFiles is null");
this.portBinder = requireNonNull(portBinder, "portBinder is null");
this.hadoopImagesVersion = requireNonNull(config, "config is null").getHadoopImagesVersion();
this.configDir = dockerFiles.getDockerFilesHostDirectory("conf/environment/singlenode-spark-iceberg-nessie");
}

@Override
Expand All @@ -70,6 +73,8 @@ public void extendEnvironment(Environment.Builder builder)
builder.configureContainer(TESTS, dockerContainer -> dockerContainer
// Binding instead of copying for avoiding OutOfMemoryError https://github.com/testcontainers/testcontainers-java/issues/2863
.withFileSystemBind(HIVE_JDBC_PROVIDER.getParent(), "/docker/jdbc", BindMode.READ_ONLY));

configureTempto(builder, configDir);
}

@SuppressWarnings("resource")
Expand Down
Loading
Loading