Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES;
import static org.apache.iceberg.MetadataTableType.ENTRIES;

Expand Down Expand Up @@ -202,13 +204,26 @@ private void appendDataFile(RowBlockBuilder blockBuilder, StructProjection dataF
Map<Integer, Long> nanValueCounts = dataFile.get(++position, Map.class);
appendIntegerBigintMap((MapBlockBuilder) fieldBuilders.get(position), nanValueCounts);

//noinspection unchecked
Map<Integer, ByteBuffer> lowerBounds = dataFile.get(++position, Map.class);
appendIntegerVarcharMap((MapBlockBuilder) fieldBuilders.get(position), lowerBounds);
switch (ContentType.of(content)) {
case DATA, EQUALITY_DELETE -> {
//noinspection unchecked
Map<Integer, ByteBuffer> lowerBounds = dataFile.get(++position, Map.class);
appendIntegerVarcharMap((MapBlockBuilder) fieldBuilders.get(position), lowerBounds);

//noinspection unchecked
Map<Integer, ByteBuffer> upperBounds = dataFile.get(++position, Map.class);
appendIntegerVarcharMap((MapBlockBuilder) fieldBuilders.get(position), upperBounds);
//noinspection unchecked
Map<Integer, ByteBuffer> upperBounds = dataFile.get(++position, Map.class);
appendIntegerVarcharMap((MapBlockBuilder) fieldBuilders.get(position), upperBounds);
}
case POSITION_DELETE -> {
//noinspection unchecked
Map<Integer, ByteBuffer> lowerBounds = dataFile.get(++position, Map.class);
appendBoundsForPositionDelete((MapBlockBuilder) fieldBuilders.get(position), lowerBounds);

//noinspection unchecked
Map<Integer, ByteBuffer> upperBounds = dataFile.get(++position, Map.class);
appendBoundsForPositionDelete((MapBlockBuilder) fieldBuilders.get(position), upperBounds);
}
}

ByteBuffer keyMetadata = dataFile.get(++position, ByteBuffer.class);
if (keyMetadata == null) {
Expand All @@ -222,12 +237,30 @@ private void appendDataFile(RowBlockBuilder blockBuilder, StructProjection dataF
List<Long> splitOffsets = dataFile.get(++position, List.class);
appendBigintArray((ArrayBlockBuilder) fieldBuilders.get(position), splitOffsets);

//noinspection unchecked
List<Long> equalityIds = dataFile.get(++position, List.class);
appendBigintArray((ArrayBlockBuilder) fieldBuilders.get(position), equalityIds);
switch (ContentType.of(content)) {
case DATA -> {
// data files don't have equality ids
fieldBuilders.get(++position).appendNull();

Integer sortOrderId = dataFile.get(++position, Integer.class);
INTEGER.writeLong(fieldBuilders.get(position), Long.valueOf(sortOrderId));
Integer sortOrderId = dataFile.get(++position, Integer.class);
INTEGER.writeLong(fieldBuilders.get(position), Long.valueOf(sortOrderId));
}
case POSITION_DELETE -> {
// position delete files don't have equality ids
fieldBuilders.get(++position).appendNull();

// position delete files don't have sort order id
fieldBuilders.get(++position).appendNull();
}
case EQUALITY_DELETE -> {
//noinspection unchecked
List<Integer> equalityIds = dataFile.get(++position, List.class);
appendIntegerArray((ArrayBlockBuilder) fieldBuilders.get(position), equalityIds);

Integer sortOrderId = dataFile.get(++position, Integer.class);
INTEGER.writeLong(fieldBuilders.get(position), Long.valueOf(sortOrderId));
}
}
});
}

Expand All @@ -244,6 +277,19 @@ public static void appendBigintArray(ArrayBlockBuilder blockBuilder, @Nullable L
});
}

public static void appendIntegerArray(ArrayBlockBuilder blockBuilder, @Nullable List<Integer> values)
{
if (values == null) {
blockBuilder.appendNull();
return;
}
blockBuilder.buildEntry(elementBuilder -> {
for (Integer value : values) {
INTEGER.writeLong(elementBuilder, value);
}
});
}

private static void appendIntegerBigintMap(MapBlockBuilder blockBuilder, @Nullable Map<Integer, Long> values)
{
if (values == null) {
Expand All @@ -268,4 +314,43 @@ private void appendIntegerVarcharMap(MapBlockBuilder blockBuilder, @Nullable Map
VARCHAR.writeString(valueBuilder, Transforms.identity().toHumanString(type, Conversions.fromByteBuffer(type, value)));
}));
}

private static void appendBoundsForPositionDelete(MapBlockBuilder blockBuilder, @Nullable Map<Integer, ByteBuffer> values)
{
if (values == null) {
blockBuilder.appendNull();
return;
}

blockBuilder.buildEntry((keyBuilder, valueBuilder) -> {
INTEGER.writeLong(keyBuilder, DELETE_FILE_POS.fieldId());
ByteBuffer pos = values.get(DELETE_FILE_POS.fieldId());
checkArgument(pos != null, "delete file pos is null");
VARCHAR.writeString(valueBuilder, Transforms.identity().toHumanString(Types.LongType.get(), Conversions.fromByteBuffer(Types.LongType.get(), pos)));

INTEGER.writeLong(keyBuilder, DELETE_FILE_PATH.fieldId());
ByteBuffer path = values.get(DELETE_FILE_PATH.fieldId());
checkArgument(path != null, "delete file path is null");
VARCHAR.writeString(valueBuilder, Transforms.identity().toHumanString(Types.StringType.get(), Conversions.fromByteBuffer(Types.StringType.get(), path)));
});
}

private enum ContentType
{
DATA,
POSITION_DELETE,
EQUALITY_DELETE;

static ContentType of(int content)
{
checkArgument(content >= 0 && content <= 2, "Unexpected content type: %s", content);
if (content == 0) {
return DATA;
}
if (content == 1) {
return POSITION_DELETE;
}
return EQUALITY_DELETE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,22 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.iceberg.IcebergFileFormat.ORC;
import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET;
import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory;
import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore;
import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTable;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.testing.MaterializedResult.DEFAULT_PRECISION;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;

Expand Down Expand Up @@ -639,6 +643,129 @@ void testEntriesTable()
}
}

@Test
void testEntriesAfterPositionDelete()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_entries", "AS SELECT 1 id, DATE '2014-01-01' dt")) {
assertUpdate("DELETE FROM " + table.getName() + " WHERE id = 1", 1);

Table icebergTable = loadTable(table.getName());
Snapshot snapshot = icebergTable.currentSnapshot();
long snapshotId = snapshot.snapshotId();
long sequenceNumber = snapshot.sequenceNumber();

assertThat(computeScalar("SELECT status FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo(1);
assertThat(computeScalar("SELECT snapshot_id FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo(snapshotId);
assertThat(computeScalar("SELECT sequence_number FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo(sequenceNumber);
assertThat(computeScalar("SELECT file_sequence_number FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo(2L);

MaterializedRow deleteFile = (MaterializedRow) computeScalar("SELECT data_file FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId);
assertThat(deleteFile.getFieldCount()).isEqualTo(16);
assertThat(deleteFile.getField(0)).isEqualTo(1); // content
assertThat((String) deleteFile.getField(1)).endsWith(format.toString().toLowerCase(ENGLISH)); // file_path
assertThat(deleteFile.getField(2)).isEqualTo(format.toString()); // file_format
assertThat(deleteFile.getField(3)).isEqualTo(0); // spec_id
assertThat(deleteFile.getField(4)).isEqualTo(1L); // record_count
assertThat((long) deleteFile.getField(5)).isPositive(); // file_size_in_bytes

//noinspection unchecked
Map<Integer, Long> columnSizes = (Map<Integer, Long>) deleteFile.getField(6);
switch (format) {
case ORC -> assertThat(columnSizes).isNull();
case PARQUET -> assertThat(columnSizes)
.hasSize(2)
.satisfies(_ -> assertThat(columnSizes.get(DELETE_FILE_POS.fieldId())).isPositive())
.satisfies(_ -> assertThat(columnSizes.get(DELETE_FILE_PATH.fieldId())).isPositive());
default -> throw new IllegalArgumentException("Unsupported format: " + format);
}

assertThat(deleteFile.getField(7)).isEqualTo(Map.of(DELETE_FILE_POS.fieldId(), 1L, DELETE_FILE_PATH.fieldId(), 1L)); // value_counts
assertThat(deleteFile.getField(8)).isEqualTo(Map.of(DELETE_FILE_POS.fieldId(), 0L, DELETE_FILE_PATH.fieldId(), 0L)); // null_value_counts
assertThat(deleteFile.getField(9)).isEqualTo(value(Map.of(), null)); // nan_value_counts

// lower_bounds
//noinspection unchecked
Map<Integer, String> lowerBounds = (Map<Integer, String>) deleteFile.getField(10);
assertThat(lowerBounds)
.hasSize(2)
.satisfies(_ -> assertThat(lowerBounds.get(DELETE_FILE_POS.fieldId())).isEqualTo("0"))
.satisfies(_ -> assertThat(lowerBounds.get(DELETE_FILE_PATH.fieldId())).contains(table.getName()));

// upper_bounds
//noinspection unchecked
Map<Integer, String> upperBounds = (Map<Integer, String>) deleteFile.getField(11);
assertThat(lowerBounds)
.hasSize(2)
.satisfies(_ -> assertThat(upperBounds.get(DELETE_FILE_POS.fieldId())).isEqualTo("0"))
.satisfies(_ -> assertThat(upperBounds.get(DELETE_FILE_PATH.fieldId())).contains(table.getName()));

assertThat(deleteFile.getField(12)).isNull(); // key_metadata
assertThat(deleteFile.getField(13)).isEqualTo(List.of(value(4L, 3L))); // split_offsets
assertThat(deleteFile.getField(14)).isNull(); // equality_ids
assertThat(deleteFile.getField(15)).isNull(); // sort_order_id

assertThat(computeScalar("SELECT readable_metrics FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo("""
{\
"dt":{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null},\
"id":{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null}\
}""");
}
}

@Test
void testEntriesAfterEqualityDelete()
throws Exception
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_entries", "AS SELECT 1 id, DATE '2014-01-01' dt")) {
Table icebergTable = loadTable(table.getName());
assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "0");
writeEqualityDeleteForTable(icebergTable, fileSystemFactory, Optional.empty(), Optional.empty(), ImmutableMap.of("id", 1), Optional.empty());
assertThat(icebergTable.currentSnapshot().summary()).containsEntry("total-equality-deletes", "1");

Snapshot snapshot = icebergTable.currentSnapshot();
long snapshotId = snapshot.snapshotId();
long sequenceNumber = snapshot.sequenceNumber();

assertThat(computeScalar("SELECT status FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo(1);
assertThat(computeScalar("SELECT snapshot_id FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo(snapshotId);
assertThat(computeScalar("SELECT sequence_number FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo(sequenceNumber);
assertThat(computeScalar("SELECT file_sequence_number FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo(2L);

MaterializedRow dataFile = (MaterializedRow) computeScalar("SELECT data_file FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId);
assertThat(dataFile.getFieldCount()).isEqualTo(16);
assertThat(dataFile.getField(0)).isEqualTo(2); // content
assertThat(dataFile.getField(3)).isEqualTo(0); // spec_id
assertThat(dataFile.getField(4)).isEqualTo(1L); // record_count
assertThat((long) dataFile.getField(5)).isPositive(); // file_size_in_bytes
assertThat(dataFile.getField(6)).isEqualTo(Map.of(1, 45L)); // column_sizes
assertThat(dataFile.getField(7)).isEqualTo(Map.of(1, 1L)); // value_counts
assertThat(dataFile.getField(8)).isEqualTo(Map.of(1, 0L)); // null_value_counts
assertThat(dataFile.getField(9)).isEqualTo(Map.of()); // nan_value_counts
assertThat(dataFile.getField(10)).isEqualTo(Map.of(1, "1")); // lower_bounds
assertThat(dataFile.getField(11)).isEqualTo(Map.of(1, "1")); // upper_bounds
assertThat(dataFile.getField(12)).isNull(); // key_metadata
assertThat(dataFile.getField(13)).isEqualTo(List.of(4L)); // split_offsets
assertThat(dataFile.getField(14)).isEqualTo(List.of(1)); // equality_ids
assertThat(dataFile.getField(15)).isEqualTo(0); // sort_order_id

assertThat(computeScalar("SELECT readable_metrics FROM \"" + table.getName() + "$entries\"" + " WHERE snapshot_id = " + snapshotId))
.isEqualTo("""
{\
"dt":{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null},\
"id":{"column_size":45,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":1,"upper_bound":1}\
}""");
}
}

@Test
public void testPartitionColumns()
{
Expand Down