Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public static ColumnTransform getColumnTransform(PartitionField field, Type sour

private static ColumnTransform identity(Type type)
{
return new ColumnTransform(type, false, true, Function.identity(), ValueTransform.identity(type));
return new ColumnTransform(type, false, true, false, Function.identity(), ValueTransform.identity(type));
Comment thread
findepi marked this conversation as resolved.
Outdated
}

@VisibleForTesting
Expand All @@ -174,6 +174,7 @@ static ColumnTransform bucket(Type type, int count)
INTEGER,
false,
false,
false,
block -> bucketBlock(block, count, hasher),
(block, position) -> {
if (block.isNull(position)) {
Expand Down Expand Up @@ -230,6 +231,7 @@ private static ColumnTransform yearsFromDate()
INTEGER,
false,
true,
true,
block -> transformBlock(DATE, INTEGER, block, transform),
ValueTransform.from(DATE, transform));
}
Expand All @@ -241,6 +243,7 @@ private static ColumnTransform monthsFromDate()
INTEGER,
false,
true,
true,
block -> transformBlock(DATE, INTEGER, block, transform),
ValueTransform.from(DATE, transform));
}
Expand All @@ -252,6 +255,7 @@ private static ColumnTransform daysFromDate()
INTEGER,
false,
true,
true,
block -> transformBlock(DATE, INTEGER, block, transform),
ValueTransform.from(DATE, transform));
}
Expand All @@ -263,6 +267,7 @@ private static ColumnTransform yearsFromTimestamp()
INTEGER,
false,
true,
true,
block -> transformBlock(TIMESTAMP_MICROS, INTEGER, block, transform),
ValueTransform.from(TIMESTAMP_MICROS, transform));
}
Expand All @@ -274,6 +279,7 @@ private static ColumnTransform monthsFromTimestamp()
INTEGER,
false,
true,
true,
block -> transformBlock(TIMESTAMP_MICROS, INTEGER, block, transform),
ValueTransform.from(TIMESTAMP_MICROS, transform));
}
Expand All @@ -285,6 +291,7 @@ private static ColumnTransform daysFromTimestamp()
INTEGER,
false,
true,
true,
block -> transformBlock(TIMESTAMP_MICROS, INTEGER, block, transform),
ValueTransform.from(TIMESTAMP_MICROS, transform));
}
Expand All @@ -296,6 +303,7 @@ private static ColumnTransform hoursFromTimestamp()
INTEGER,
false,
true,
true,
block -> transformBlock(TIMESTAMP_MICROS, INTEGER, block, transform),
ValueTransform.from(TIMESTAMP_MICROS, transform));
}
Expand All @@ -307,6 +315,7 @@ private static ColumnTransform yearsFromTimestampWithTimeZone()
INTEGER,
false,
true,
true,
block -> extractTimestampWithTimeZone(block, transform),
ValueTransform.fromTimestampTzTransform(transform));
}
Expand All @@ -318,6 +327,7 @@ private static ColumnTransform monthsFromTimestampWithTimeZone()
INTEGER,
false,
true,
true,
block -> extractTimestampWithTimeZone(block, transform),
ValueTransform.fromTimestampTzTransform(transform));
}
Expand All @@ -329,6 +339,7 @@ private static ColumnTransform daysFromTimestampWithTimeZone()
INTEGER,
false,
true,
true,
block -> extractTimestampWithTimeZone(block, transform),
ValueTransform.fromTimestampTzTransform(transform));
}
Expand All @@ -340,6 +351,7 @@ private static ColumnTransform hoursFromTimestampWithTimeZone()
INTEGER,
false,
true,
true,
block -> extractTimestampWithTimeZone(block, transform),
ValueTransform.fromTimestampTzTransform(transform));
}
Expand Down Expand Up @@ -453,6 +465,7 @@ private static ColumnTransform truncateInteger(int width)
INTEGER,
false,
true,
false,
block -> truncateInteger(block, width),
(block, position) -> {
if (block.isNull(position)) {
Expand Down Expand Up @@ -487,6 +500,7 @@ private static ColumnTransform truncateBigint(int width)
BIGINT,
false,
true,
false,
block -> truncateBigint(block, width),
(block, position) -> {
if (block.isNull(position)) {
Expand Down Expand Up @@ -522,6 +536,7 @@ private static ColumnTransform truncateShortDecimal(Type type, int width, Decima
type,
false,
true,
false,
block -> truncateShortDecimal(decimal, block, unscaledWidth),
(block, position) -> {
if (block.isNull(position)) {
Expand Down Expand Up @@ -559,6 +574,7 @@ private static ColumnTransform truncateLongDecimal(Type type, int width, Decimal
type,
false,
true,
false,
block -> truncateLongDecimal(decimal, block, unscaledWidth),
(block, position) -> {
if (block.isNull(position)) {
Expand Down Expand Up @@ -606,6 +622,7 @@ private static ColumnTransform truncateVarchar(int width)
VARCHAR,
false,
true,
false,
block -> truncateVarchar(block, width),
(block, position) -> {
if (block.isNull(position)) {
Expand Down Expand Up @@ -647,6 +664,7 @@ private static ColumnTransform truncateVarbinary(int width)
VARBINARY,
false,
true,
false,
block -> truncateVarbinary(block, width),
(block, position) -> {
if (block.isNull(position)) {
Expand Down Expand Up @@ -685,6 +703,7 @@ private static ColumnTransform voidTransform(Type type)
type,
true,
true,
false,
block -> RunLengthEncodedBlock.create(nullBlock, block.getPositionCount()),
(block, position) -> null);
}
Expand Down Expand Up @@ -739,14 +758,16 @@ public static class ColumnTransform
private final Type type;
private final boolean preservesNonNull;
private final boolean monotonic;
private final boolean temporal;
private final Function<Block, Block> blockTransform;
private final ValueTransform valueTransform;

public ColumnTransform(Type type, boolean preservesNonNull, boolean monotonic, Function<Block, Block> blockTransform, ValueTransform valueTransform)
public ColumnTransform(Type type, boolean preservesNonNull, boolean monotonic, boolean temporal, Function<Block, Block> blockTransform, ValueTransform valueTransform)
{
this.type = requireNonNull(type, "type is null");
this.preservesNonNull = preservesNonNull;
this.monotonic = monotonic;
this.temporal = temporal;
this.blockTransform = requireNonNull(blockTransform, "transform is null");
this.valueTransform = requireNonNull(valueTransform, "valueTransform is null");
}
Expand All @@ -769,6 +790,11 @@ public boolean isMonotonic()
return monotonic;
}

public boolean isTemporal()
{
return temporal;
}

public Function<Block, Block> getBlockTransform()
{
return blockTransform;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.plugin.iceberg.ColumnIdentity;
import io.trino.plugin.iceberg.IcebergMaterializedViewDefinition;
import io.trino.plugin.iceberg.IcebergUtil;
import io.trino.plugin.iceberg.PartitionTransforms.ColumnTransform;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.CatalogSchemaTableName;
import io.trino.spi.connector.ColumnMetadata;
Expand All @@ -48,6 +49,7 @@
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.types.Types;

import java.io.IOException;
import java.time.Duration;
Expand All @@ -56,25 +58,35 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.hive.HiveMetadata.STORAGE_TABLE;
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.hive.ViewReaderUtil.ICEBERG_MATERIALIZED_VIEW_COMMENT;
import static io.trino.plugin.hive.ViewReaderUtil.PRESTO_VIEW_FLAG;
import static io.trino.plugin.hive.metastore.glue.converter.GlueToTrinoConverter.mappedCopy;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergMaterializedViewAdditionalProperties.STORAGE_SCHEMA;
import static io.trino.plugin.iceberg.IcebergMaterializedViewAdditionalProperties.getStorageSchema;
import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.decodeMaterializedViewData;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
import static io.trino.plugin.iceberg.IcebergUtil.commit;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableProperties;
import static io.trino.plugin.iceberg.IcebergUtil.schemaFromMetadata;
import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields;
import static io.trino.plugin.iceberg.PartitionTransforms.getColumnTransform;
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;
Expand Down Expand Up @@ -217,9 +229,46 @@ protected SchemaTableName createMaterializedViewStorageTable(ConnectorSession se

String storageSchema = getStorageSchema(definition.getProperties()).orElse(viewName.getSchemaName());
SchemaTableName storageTable = new SchemaTableName(storageSchema, storageTableName);
List<ColumnMetadata> columns = definition.getColumns().stream()
.map(column -> new ColumnMetadata(column.getName(), typeForMaterializedViewStorageTable(typeManager.getType(column.getType()))))
.collect(toImmutableList());

Schema schemaWithTimestampTzPreserved = schemaFromMetadata(mappedCopy(
definition.getColumns(),
column -> {
Type type = typeManager.getType(column.getType());
if (type instanceof TimestampWithTimeZoneType timestampTzType && timestampTzType.getPrecision() <= 6) {
Comment thread
raunaqmorarka marked this conversation as resolved.
// For now preserve timestamptz columns so that we can parse partitioning
type = TIMESTAMP_TZ_MICROS;
}
else {
type = typeForMaterializedViewStorageTable(type);
Comment thread
findepi marked this conversation as resolved.
Outdated
}
return new ColumnMetadata(column.getName(), type);
}));
PartitionSpec partitionSpec = parsePartitionFields(schemaWithTimestampTzPreserved, getPartitioning(definition.getProperties()));
Set<String> temporalPartitioningSources = partitionSpec.fields().stream()
.flatMap(partitionField -> {
Types.NestedField sourceField = schemaWithTimestampTzPreserved.findField(partitionField.sourceId());
Type sourceType = toTrinoType(sourceField.type(), typeManager);
ColumnTransform columnTransform = getColumnTransform(partitionField, sourceType);
if (!columnTransform.isTemporal()) {
return Stream.of();
}
return Stream.of(sourceField.name());
})
.collect(toImmutableSet());

List<ColumnMetadata> columns = mappedCopy(
definition.getColumns(),
column -> {
Type type = typeManager.getType(column.getType());
if (type instanceof TimestampWithTimeZoneType timestampTzType && timestampTzType.getPrecision() <= 6 && temporalPartitioningSources.contains(column.getName())) {
// Apply point-in-time semantics to maintain partitioning capabilities
type = TIMESTAMP_TZ_MICROS;
}
else {
type = typeForMaterializedViewStorageTable(type);
}
return new ColumnMetadata(column.getName(), type);
});

ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(storageTable, columns, storageTableProperties, Optional.empty());
Transaction transaction = IcebergUtil.newCreateTableTransaction(this, tableMetadata, session);
Expand Down Expand Up @@ -258,9 +307,7 @@ private Type typeForMaterializedViewStorageTable(Type type)
: VARCHAR;
}
if (type instanceof TimestampWithTimeZoneType) {
// Iceberg does not store the time zone
// TODO allow temporal partitioning on these columns, or MV property to
// drop zone info and use timestamptz directly
// Iceberg does not store the time zone.
return VARCHAR;
}
if (type instanceof ArrayType arrayType) {
Expand Down
Loading