Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -114,7 +112,6 @@ public Map<Integer, Long> getColumnSizes()

public static class Builder
{
private final Map<Integer, Type.PrimitiveType> idToTypeMapping;
private final List<Types.NestedField> columns;
private final TypeManager typeManager;
private final Map<Integer, Optional<Long>> nullCounts = new HashMap<>();
Expand All @@ -127,11 +124,9 @@ public static class Builder
private long size;

public Builder(
Map<Integer, Type.PrimitiveType> idToTypeMapping,
List<Types.NestedField> columns,
TypeManager typeManager)
{
this.idToTypeMapping = ImmutableMap.copyOf(requireNonNull(idToTypeMapping, "idToTypeMapping is null"));
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
this.typeManager = requireNonNull(typeManager, "typeManager is null");

Expand Down Expand Up @@ -161,8 +156,6 @@ public void acceptDataFile(DataFile dataFile, PartitionSpec partitionSpec)
.map(PartitionField::sourceId)
.collect(toImmutableSet());
Map<Integer, Optional<String>> partitionValues = getPartitionKeys(dataFile.partition(), partitionSpec);
Map<Integer, Object> lowerBounds = convertBounds(dataFile.lowerBounds());
Map<Integer, Object> upperBounds = convertBounds(dataFile.upperBounds());
for (Types.NestedField column : partitionSpec.schema().columns()) {
int id = column.fieldId();
io.trino.spi.type.Type trinoType = fieldIdToTrinoType.get(id);
Expand All @@ -187,8 +180,10 @@ public void acceptDataFile(DataFile dataFile, PartitionSpec partitionSpec)
}
}
else {
Object lowerBound = convertIcebergValueToTrino(column.type(), lowerBounds.getOrDefault(id, null));
Object upperBound = convertIcebergValueToTrino(column.type(), upperBounds.getOrDefault(id, null));
Object lowerBound = convertIcebergValueToTrino(column.type(),
Conversions.fromByteBuffer(column.type(), Optional.ofNullable(dataFile.lowerBounds()).map(a -> a.get(id)).orElse(null)));
Object upperBound = convertIcebergValueToTrino(column.type(),
Conversions.fromByteBuffer(column.type(), Optional.ofNullable(dataFile.upperBounds()).map(a -> a.get(id)).orElse(null)));
Optional<Long> nullCount = Optional.ofNullable(dataFile.nullValueCounts().get(id));
updateMinMaxStats(
id,
Expand Down Expand Up @@ -252,28 +247,6 @@ private void updateMinMaxStats(
}).updateMinMax(lowerBound, upperBound);
}
}

/**
* Converts a file's column bounds to a Map from field id to Iceberg Object representation
* @param idToMetricMap A Map from field id to Iceberg ByteBuffer representation
* @return A Map from field id to Iceberg Object representation
*/
private Map<Integer, Object> convertBounds(@Nullable Map<Integer, ByteBuffer> idToMetricMap)
{
if (idToMetricMap == null) {
return ImmutableMap.of();
}
ImmutableMap.Builder<Integer, Object> map = ImmutableMap.builder();
idToMetricMap.forEach((id, value) -> {
Type.PrimitiveType type = idToTypeMapping.get(id);
verify(type != null, "No type for column id %s, known types: %s", id, idToTypeMapping);
Object icebergRepresentation = Conversions.fromByteBuffer(type, value);
if (icebergRepresentation != null) {
map.put(id, icebergRepresentation);
}
});
return map.buildOrThrow();
}
}

private static class ColumnStatistics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private Map<StructLikeWrapper, IcebergStatistics> getStatisticsByPartition(Table

partitions.computeIfAbsent(
partitionWrapper,
ignored -> new IcebergStatistics.Builder(idToTypeMapping, icebergTable.schema().columns(), typeManager))
ignored -> new IcebergStatistics.Builder(icebergTable.schema().columns(), typeManager))
.acceptDataFile(dataFile, fileScanTask.spec());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private TableStatistics makeTableStatistics(IcebergTableHandle tableHandle, Cons
.useSnapshot(tableHandle.getSnapshotId().get())
.includeColumnStats();

IcebergStatistics.Builder icebergStatisticsBuilder = new IcebergStatistics.Builder(idToTypeMapping, columns, typeManager);
IcebergStatistics.Builder icebergStatisticsBuilder = new IcebergStatistics.Builder(columns, typeManager);
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
for (FileScanTask fileScanTask : fileScanTasks) {
DataFile dataFile = fileScanTask.file();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,33 @@ public void testPartitionTable()
assertEquals(rowsByPartition.get(LocalDate.parse("2019-09-10")).getField(4), new MaterializedRow(DEFAULT_PRECISION, new MaterializedRow(DEFAULT_PRECISION, 4L, 5L, 0L)));
}

@Test
public void testPartitionTableOnDropColumn()
{
assertUpdate("CREATE TABLE test_schema.test_table_multi_column (_varchar VARCHAR, _bigint BIGINT, _date DATE) WITH (partitioning = ARRAY['_date'])");
assertUpdate("INSERT INTO test_schema.test_table_multi_column VALUES ('a', 0, CAST('2019-09-08' AS DATE)), ('a', 1, CAST('2019-09-09' AS DATE)), ('b', 2, CAST('2019-09-09' AS DATE))", 3);
assertUpdate("INSERT INTO test_schema.test_table_multi_column VALUES ('c', 3, CAST('2019-09-09' AS DATE)), ('a', 4, CAST('2019-09-10' AS DATE)), ('b', 5, CAST('2019-09-10' AS DATE))", 3);
assertQuery("SELECT count(*) FROM test_schema.test_table_multi_column", "VALUES 6");
MaterializedResult result = computeActual("SELECT * from test_schema.\"test_table_multi_column$partitions\"");

assertEquals(result.getRowCount(), 3);
Map<LocalDate, MaterializedRow> rowsByPartition = result.getMaterializedRows().stream()
.collect(toImmutableMap(row -> ((LocalDate) ((MaterializedRow) row.getField(0)).getField(0)), Function.identity()));

assertEquals(rowsByPartition.get(LocalDate.parse("2019-09-08")).getField(4), new MaterializedRow(DEFAULT_PRECISION,
new MaterializedRow(DEFAULT_PRECISION, "a", "a", 0L),
new MaterializedRow(DEFAULT_PRECISION, 0L, 0L, 0L)));

assertUpdate("ALTER TABLE test_schema.test_table_multi_column drop column _varchar");
MaterializedResult resultAfterDrop = computeActual("SELECT * from test_schema.\"test_table_multi_column$partitions\"");
assertEquals(resultAfterDrop.getRowCount(), 3);
Map<LocalDate, MaterializedRow> rowsByPartitionAfterDrop = resultAfterDrop.getMaterializedRows().stream()
.collect(toImmutableMap(row -> ((LocalDate) ((MaterializedRow) row.getField(0)).getField(0)), Function.identity()));
assertEquals(rowsByPartitionAfterDrop.get(LocalDate.parse("2019-09-08")).getField(4), new MaterializedRow(DEFAULT_PRECISION,
new MaterializedRow(DEFAULT_PRECISION, 0L, 0L, 0L)));
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_multi_column");
}

@Test
public void testHistoryTable()
{
Expand Down