Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.graph.Traverser;
import com.google.inject.Inject;
import io.airlift.slice.Slice;
import io.trino.annotation.NotThreadSafe;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
Expand Down Expand Up @@ -61,6 +62,7 @@
import io.trino.plugin.iceberg.IcebergParquetColumnIOConverter.FieldContext;
import io.trino.plugin.iceberg.delete.DeleteFile;
import io.trino.plugin.iceberg.delete.DeleteFilter;
import io.trino.plugin.iceberg.delete.EqualityDeleteFilter;
import io.trino.plugin.iceberg.delete.PositionDeleteFilter;
import io.trino.plugin.iceberg.delete.RowPredicate;
import io.trino.plugin.iceberg.fileio.ForwardingInputFile;
Expand Down Expand Up @@ -92,6 +94,7 @@
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.MappedField;
Expand All @@ -100,6 +103,9 @@
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.StructProjection;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
Expand Down Expand Up @@ -353,6 +359,7 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) {
List<DeleteFilter> deleteFilters = readDeletes(
session,
tableSchema,
readColumns,
path,
deletes,
readerPageSourceWithRowPositions.getStartRowPosition(),
Expand Down Expand Up @@ -390,6 +397,7 @@ else if (deleteFile.content() == EQUALITY_DELETES) {
private List<DeleteFilter> readDeletes(
ConnectorSession session,
Schema schema,
List<IcebergColumnHandle> readColumns,
String dataFilePath,
List<DeleteFile> deleteFiles,
Optional<Long> startRowPosition,
Expand All @@ -400,6 +408,7 @@ private List<DeleteFilter> readDeletes(
Slice targetPath = utf8Slice(dataFilePath);
List<DeleteFilter> filters = new ArrayList<>();
LongBitmapDataProvider deletedRows = new Roaring64Bitmap();
Map<Set<Integer>, EqualityDeleteSet> deletesSetByFieldIds = new HashMap<>();

IcebergColumnHandle deleteFilePath = getColumnHandle(DELETE_FILE_PATH, typeManager);
IcebergColumnHandle deleteFilePos = getColumnHandle(DELETE_FILE_POS, typeManager);
Expand Down Expand Up @@ -436,14 +445,17 @@ private List<DeleteFilter> readDeletes(
}
}
else if (delete.content() == EQUALITY_DELETES) {
Comment thread
Heltman marked this conversation as resolved.
Outdated
List<Integer> fieldIds = delete.equalityFieldIds();
Set<Integer> fieldIds = ImmutableSet.copyOf(delete.equalityFieldIds());
verify(!fieldIds.isEmpty(), "equality field IDs are missing");
List<IcebergColumnHandle> columns = fieldIds.stream()
.map(id -> getColumnHandle(schema.findField(id), typeManager))
Schema deleteSchema = TypeUtil.select(schema, fieldIds);
List<IcebergColumnHandle> columns = deleteSchema.columns().stream()
.map(column -> getColumnHandle(column, typeManager))
.collect(toImmutableList());

EqualityDeleteSet equalityDeleteSet = deletesSetByFieldIds.computeIfAbsent(fieldIds, key -> new EqualityDeleteSet(deleteSchema, schemaFromHandles(readColumns)));

try (ConnectorPageSource pageSource = openDeletes(session, delete, columns, TupleDomain.all())) {
filters.add(readEqualityDeletes(pageSource, columns, schema));
readEqualityDeletes(pageSource, columns, equalityDeleteSet::add);
}
catch (IOException e) {
throw new UncheckedIOException(e);
Expand All @@ -458,6 +470,10 @@ else if (delete.content() == EQUALITY_DELETES) {
filters.add(new PositionDeleteFilter(deletedRows));
}

for (EqualityDeleteSet equalityDeleteSet : deletesSetByFieldIds.values()) {
filters.add(new EqualityDeleteFilter(equalityDeleteSet::contains));
}

return filters;
}

Expand Down Expand Up @@ -1537,4 +1553,27 @@ public int hashCode()
return Objects.hash(baseColumnIdentity, path);
}
}

@NotThreadSafe
private static class EqualityDeleteSet
Comment thread
findepi marked this conversation as resolved.
Outdated
{
private final StructLikeSet deleteSet;
private final StructProjection projection;

public EqualityDeleteSet(Schema deleteSchema, Schema dataSchema)
{
this.deleteSet = StructLikeSet.create(deleteSchema.asStruct());
this.projection = StructProjection.create(dataSchema, deleteSchema);
}

public void add(StructLike row)
{
deleteSet.add(row);
}

public boolean contains(StructLike row)
{
return deleteSet.contains(projection.wrap(row));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,22 @@
import io.trino.spi.Page;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.type.Type;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.StructProjection;

import java.util.List;
import java.util.function.Consumer;
import java.util.function.Predicate;

import static io.trino.plugin.iceberg.IcebergUtil.schemaFromHandles;
import static java.util.Objects.requireNonNull;

public final class EqualityDeleteFilter
implements DeleteFilter
{
private final Schema schema;
private final StructLikeSet deleteSet;
private final Predicate<StructLike> deletedRows;

private EqualityDeleteFilter(Schema schema, StructLikeSet deleteSet)
public EqualityDeleteFilter(Predicate<StructLike> deletedRows)
{
this.schema = requireNonNull(schema, "schema is null");
this.deleteSet = requireNonNull(deleteSet, "deleteSet is null");
this.deletedRows = requireNonNull(deletedRows, "deletedRows is null");
}

@Override
Expand All @@ -46,35 +42,27 @@ public RowPredicate createPredicate(List<IcebergColumnHandle> columns)
.map(IcebergColumnHandle::getType)
.toArray(Type[]::new);

Schema fileSchema = schemaFromHandles(columns);
StructProjection projection = StructProjection.create(fileSchema, schema);

return (page, position) -> {
StructLike row = new LazyTrinoRow(types, page, position);
return !deleteSet.contains(projection.wrap(row));
return !deletedRows.test(row);
};
}

public static DeleteFilter readEqualityDeletes(ConnectorPageSource pageSource, List<IcebergColumnHandle> columns, Schema tableSchema)
public static void readEqualityDeletes(ConnectorPageSource pageSource, List<IcebergColumnHandle> columns, Consumer<StructLike> deletedRows)
{
Type[] types = columns.stream()
.map(IcebergColumnHandle::getType)
.toArray(Type[]::new);

Schema deleteSchema = schemaFromHandles(columns);
StructLikeSet deleteSet = StructLikeSet.create(deleteSchema.asStruct());

while (!pageSource.isFinished()) {
Page page = pageSource.getNextPage();
if (page == null) {
continue;
}

for (int position = 0; position < page.getPositionCount(); position++) {
deleteSet.add(new TrinoRow(types, page, position));
deletedRows.accept(new TrinoRow(types, page, position));
}
}

return new EqualityDeleteFilter(deleteSchema, deleteSet);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -307,6 +308,129 @@ public void testSelectivelyOptimizingLeavesEqualityDeletes()
assertThat(loadTable(tableName).currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("1");
}

@Test
Comment thread
Heltman marked this conversation as resolved.
Outdated
public void testMultipleEqualityDeletes()
Comment thread
Heltman marked this conversation as resolved.
Outdated
throws Exception
{
String tableName = "test_multiple_equality_deletes_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25);
Table icebergTable = loadTable(tableName);
assertThat(icebergTable.currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("0");

for (int i = 1; i < 3; i++) {
writeEqualityDeleteToNationTable(
icebergTable,
Optional.empty(),
Optional.empty(),
ImmutableMap.of("regionkey", Integer.toUnsignedLong(i)));
}

assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE (regionkey != 1L AND regionkey != 2L)");
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testMultipleEqualityDeletesWithEquivalentSchemas()
throws Exception
{
String tableName = "test_multiple_equality_deletes_equivalent_schemas_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25);
Table icebergTable = loadTable(tableName);
assertThat(icebergTable.currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("0");
Schema deleteRowSchema = new Schema(ImmutableList.of("regionkey", "name").stream()
.map(name -> icebergTable.schema().findField(name))
.collect(toImmutableList()));
List<Integer> equalityFieldIds = ImmutableList.of("regionkey", "name").stream()
.map(name -> deleteRowSchema.findField(name).fieldId())
.collect(toImmutableList());
writeEqualityDeleteToNationTableWithDeleteColumns(
icebergTable,
Optional.empty(),
Optional.empty(),
ImmutableMap.of("regionkey", 1L, "name", "BRAZIL"),
deleteRowSchema,
equalityFieldIds);
Schema equivalentDeleteRowSchema = new Schema(ImmutableList.of("name", "regionkey").stream()
.map(name -> icebergTable.schema().findField(name))
.collect(toImmutableList()));
writeEqualityDeleteToNationTableWithDeleteColumns(
icebergTable,
Optional.empty(),
Optional.empty(),
ImmutableMap.of("name", "INDIA", "regionkey", 2L),
equivalentDeleteRowSchema,
equalityFieldIds);

assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE NOT ((regionkey = 1 AND name = 'BRAZIL') OR (regionkey = 2 AND name = 'INDIA'))");
assertUpdate("DROP TABLE " + tableName);
}

Comment thread
Heltman marked this conversation as resolved.
Outdated
@Test
public void testMultipleEqualityDeletesWithDifferentSchemas()
throws Exception
{
String tableName = "test_multiple_equality_deletes_different_schemas_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25);
Table icebergTable = loadTable(tableName);
assertThat(icebergTable.currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("0");
writeEqualityDeleteToNationTableWithDeleteColumns(
icebergTable,
Optional.empty(),
Optional.empty(),
ImmutableMap.of("regionkey", 1L, "name", "BRAZIL"),
Optional.of(ImmutableList.of("regionkey", "name")));
writeEqualityDeleteToNationTableWithDeleteColumns(
icebergTable,
Optional.empty(),
Optional.empty(),
ImmutableMap.of("name", "ALGERIA"),
Optional.of(ImmutableList.of("name")));
writeEqualityDeleteToNationTableWithDeleteColumns(
icebergTable,
Optional.empty(),
Optional.empty(),
ImmutableMap.of("regionkey", 2L),
Optional.of(ImmutableList.of("regionkey")));

assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE NOT ((regionkey = 1 AND name = 'BRAZIL') OR regionkey = 2 OR name = 'ALGERIA')");
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testMultipleEqualityDeletesWithNestedFields()
throws Exception
{
String tableName = "test_multiple_equality_deletes_nested_fields_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " ( id BIGINT, root ROW(nested BIGINT, nested_other BIGINT))");
assertUpdate("INSERT INTO " + tableName + " VALUES (1, row(10, 100))", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES (2, row(20, 200))", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES (2, row(20, 200))", 1);
Table icebergTable = loadTable(tableName);
assertThat(icebergTable.currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("0");

List<String> deleteFileColumns = ImmutableList.of("root.nested");
Schema deleteRowSchema = icebergTable.schema().select(deleteFileColumns);
List<Integer> equalityFieldIds = ImmutableList.of("root.nested").stream()
.map(name -> deleteRowSchema.findField(name).fieldId())
.collect(toImmutableList());
Types.StructType nestedStructType = (Types.StructType) deleteRowSchema.findField("root").type();
Record nestedStruct = GenericRecord.create(nestedStructType);
nestedStruct.setField("nested", 20L);
for (int i = 1; i < 3; i++) {
writeEqualityDeleteToNationTableWithDeleteColumns(
icebergTable,
Optional.empty(),
Optional.empty(),
ImmutableMap.of("root", nestedStruct),
deleteRowSchema,
equalityFieldIds);
}

// TODO: support read equality deletes with nested fields(https://github.com/trinodb/trino/issues/18625)
assertThatThrownBy(() -> query("SELECT * FROM " + tableName)).hasMessageContaining("Multiple entries with same key");
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testOptimizingWholeTableRemovesEqualityDeletes()
throws Exception
Expand Down Expand Up @@ -826,22 +950,50 @@ private void writeEqualityDeleteToNationTable(Table icebergTable, Optional<Parti
writeEqualityDeleteToNationTable(icebergTable, partitionSpec, partitionData, ImmutableMap.of("regionkey", 1L));
}

private void writeEqualityDeleteToNationTable(Table icebergTable, Optional<PartitionSpec> partitionSpec, Optional<PartitionData> partitionData, Map<String, Object> overwriteValues)
private void writeEqualityDeleteToNationTable(
Table icebergTable,
Optional<PartitionSpec> partitionSpec,
Optional<PartitionData> partitionData,
Map<String, Object> overwriteValues)
throws Exception
{
writeEqualityDeleteToNationTableWithDeleteColumns(icebergTable, partitionSpec, partitionData, overwriteValues, Optional.empty());
}

private void writeEqualityDeleteToNationTableWithDeleteColumns(
Table icebergTable,
Optional<PartitionSpec> partitionSpec,
Optional<PartitionData> partitionData,
Map<String, Object> overwriteValues,
Optional<List<String>> deleteFileColumns)
throws Exception
{
List<String> deleteColumns = deleteFileColumns.orElse(new ArrayList<>(overwriteValues.keySet()));
Schema deleteRowSchema = icebergTable.schema().select(deleteColumns);
Comment thread
Heltman marked this conversation as resolved.
Outdated
List<Integer> equalityDeleteFieldIds = deleteColumns.stream()
.map(name -> deleteRowSchema.findField(name).fieldId())
Comment thread
Heltman marked this conversation as resolved.
Outdated
.collect(toImmutableList());
writeEqualityDeleteToNationTableWithDeleteColumns(icebergTable, partitionSpec, partitionData, overwriteValues, deleteRowSchema, equalityDeleteFieldIds);
}

private void writeEqualityDeleteToNationTableWithDeleteColumns(
Table icebergTable,
Optional<PartitionSpec> partitionSpec,
Optional<PartitionData> partitionData,
Map<String, Object> overwriteValues,
Schema deleteRowSchema,
List<Integer> equalityDeleteFieldIds)
throws Exception
{
Path metadataDir = new Path(metastoreDir.toURI());
String deleteFileName = "delete_file_" + UUID.randomUUID();
FileIO fileIo = new ForwardingFileIo(fileSystemFactory.create(SESSION));

Schema deleteRowSchema = icebergTable.schema().select(overwriteValues.keySet());
List<Integer> equalityFieldIds = overwriteValues.keySet().stream()
.map(name -> deleteRowSchema.findField(name).fieldId())
.collect(toImmutableList());
Parquet.DeleteWriteBuilder writerBuilder = Parquet.writeDeletes(fileIo.newOutputFile(new Path(metadataDir, deleteFileName).toString()))
.forTable(icebergTable)
.rowSchema(deleteRowSchema)
.createWriterFunc(GenericParquetWriter::buildWriter)
.equalityFieldIds(equalityFieldIds)
.equalityFieldIds(equalityDeleteFieldIds)
.overwrite();
if (partitionSpec.isPresent() && partitionData.isPresent()) {
writerBuilder = writerBuilder
Expand Down