Skip to content

Commit

Permalink
Show delete content in $files metadata table
Browse files Browse the repository at this point in the history
  • Loading branch information
0xffmeta authored and nineinchnick committed Aug 27, 2024
1 parent 3b1eb2f commit 3f19cc4
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
import io.trino.spi.type.MapType;
import io.trino.spi.type.TypeManager;
import jakarta.annotation.Nullable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableGroup;
Expand All @@ -48,6 +50,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -57,17 +60,35 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Maps.immutableEntry;
import static com.google.common.collect.Streams.mapWithIndex;
import static io.trino.spi.block.MapValueBuilder.buildMapValue;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.TypeSignature.mapType;
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataTableType.FILES;
import static org.apache.iceberg.MetadataTableUtils.createMetadataTableInstance;

public class FilesTable
implements SystemTable
{
public static final String CONTENT_COLUMN_NAME = "content";
public static final String FILE_PATH_COLUMN_NAME = "file_path";
public static final String FILE_FORMAT_COLUMN_NAME = "file_format";
public static final String RECORD_COUNT_COLUMN_NAME = "record_count";
public static final String FILE_SIZE_IN_BYTES_COLUMN_NAME = "file_size_in_bytes";
public static final String COLUMN_SIZES_COLUMN_NAME = "column_sizes";
public static final String VALUE_COUNTS_COLUMN_NAME = "value_counts";
public static final String NULL_VALUE_COUNTS_COLUMN_NAME = "null_value_counts";
public static final String NAN_VALUE_COUNTS_COLUMN_NAME = "nan_value_counts";
public static final String LOWER_BOUNDS_COLUMN_NAME = "lower_bounds";
public static final String UPPER_BOUNDS_COLUMN_NAME = "upper_bounds";
public static final String KEY_METADATA_COLUMN_NAME = "key_metadata";
public static final String SPLIT_OFFSETS_COLUMN_NAME = "split_offsets";
public static final String EQUALITY_IDS_COLUMN_NAME = "equality_ids";
private final ConnectorTableMetadata tableMetadata;
private final TypeManager typeManager;
private final Table icebergTable;
Expand All @@ -80,20 +101,20 @@ public FilesTable(SchemaTableName tableName, TypeManager typeManager, Table iceb

tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"),
ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("content", INTEGER))
.add(new ColumnMetadata("file_path", VARCHAR))
.add(new ColumnMetadata("file_format", VARCHAR))
.add(new ColumnMetadata("record_count", BIGINT))
.add(new ColumnMetadata("file_size_in_bytes", BIGINT))
.add(new ColumnMetadata("column_sizes", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata("value_counts", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata("null_value_counts", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata("nan_value_counts", typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata("lower_bounds", typeManager.getType(mapType(INTEGER.getTypeSignature(), VARCHAR.getTypeSignature()))))
.add(new ColumnMetadata("upper_bounds", typeManager.getType(mapType(INTEGER.getTypeSignature(), VARCHAR.getTypeSignature()))))
.add(new ColumnMetadata("key_metadata", VARBINARY))
.add(new ColumnMetadata("split_offsets", new ArrayType(BIGINT)))
.add(new ColumnMetadata("equality_ids", new ArrayType(INTEGER)))
.add(new ColumnMetadata(CONTENT_COLUMN_NAME, INTEGER))
.add(new ColumnMetadata(FILE_PATH_COLUMN_NAME, VARCHAR))
.add(new ColumnMetadata(FILE_FORMAT_COLUMN_NAME, VARCHAR))
.add(new ColumnMetadata(RECORD_COUNT_COLUMN_NAME, BIGINT))
.add(new ColumnMetadata(FILE_SIZE_IN_BYTES_COLUMN_NAME, BIGINT))
.add(new ColumnMetadata(COLUMN_SIZES_COLUMN_NAME, typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata(VALUE_COUNTS_COLUMN_NAME, typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata(NULL_VALUE_COUNTS_COLUMN_NAME, typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata(NAN_VALUE_COUNTS_COLUMN_NAME, typeManager.getType(mapType(INTEGER.getTypeSignature(), BIGINT.getTypeSignature()))))
.add(new ColumnMetadata(LOWER_BOUNDS_COLUMN_NAME, typeManager.getType(mapType(INTEGER.getTypeSignature(), VARCHAR.getTypeSignature()))))
.add(new ColumnMetadata(UPPER_BOUNDS_COLUMN_NAME, typeManager.getType(mapType(INTEGER.getTypeSignature(), VARCHAR.getTypeSignature()))))
.add(new ColumnMetadata(KEY_METADATA_COLUMN_NAME, VARBINARY))
.add(new ColumnMetadata(SPLIT_OFFSETS_COLUMN_NAME, new ArrayType(BIGINT)))
.add(new ColumnMetadata(EQUALITY_IDS_COLUMN_NAME, new ArrayType(INTEGER)))
.build());
this.snapshotId = requireNonNull(snapshotId, "snapshotId is null");
}
Expand Down Expand Up @@ -121,11 +142,16 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
}

Map<Integer, Type> idToTypeMapping = getIcebergIdToTypeMapping(icebergTable.schema());
TableScan tableScan = icebergTable.newScan()
TableScan tableScan = createMetadataTableInstance(icebergTable, FILES)
.newScan()
.useSnapshot(snapshotId.get())
.includeColumnStats();

PlanFilesIterable planFilesIterable = new PlanFilesIterable(tableScan.planFiles(), idToTypeMapping, types, typeManager);
Map<String, Integer> columnNameToPosition = mapWithIndex(tableScan.schema().columns().stream(),
(column, position) -> immutableEntry(column.name(), Long.valueOf(position).intValue()))
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));

PlanFilesIterable planFilesIterable = new PlanFilesIterable(tableScan.planFiles(), idToTypeMapping, types, columnNameToPosition, typeManager);
return planFilesIterable.cursor();
}

Expand All @@ -136,15 +162,22 @@ private static class PlanFilesIterable
private final CloseableIterable<FileScanTask> planFiles;
private final Map<Integer, Type> idToTypeMapping;
private final List<io.trino.spi.type.Type> types;
private final Map<String, Integer> columnNameToPosition;
private boolean closed;
private final MapType integerToBigintMapType;
private final MapType integerToVarcharMapType;

public PlanFilesIterable(CloseableIterable<FileScanTask> planFiles, Map<Integer, Type> idToTypeMapping, List<io.trino.spi.type.Type> types, TypeManager typeManager)
public PlanFilesIterable(
CloseableIterable<FileScanTask> planFiles,
Map<Integer, Type> idToTypeMapping,
List<io.trino.spi.type.Type> types,
Map<String, Integer> columnNameToPosition,
TypeManager typeManager)
{
this.planFiles = requireNonNull(planFiles, "planFiles is null");
this.idToTypeMapping = ImmutableMap.copyOf(requireNonNull(idToTypeMapping, "idToTypeMapping is null"));
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
this.columnNameToPosition = ImmutableMap.copyOf(requireNonNull(columnNameToPosition, "columnNameToPosition is null"));
this.integerToBigintMapType = new MapType(INTEGER, BIGINT, typeManager.getTypeOperators());
this.integerToVarcharMapType = new MapType(INTEGER, VARCHAR, typeManager.getTypeOperators());
addCloseable(planFiles);
Expand Down Expand Up @@ -174,45 +207,64 @@ public CloseableIterator<List<Object>> iterator()
addCloseable(planFilesIterator);

return new CloseableIterator<>() {
private CloseableIterator<StructLike> currentIterator = CloseableIterator.empty();

@Override
public boolean hasNext()
{
return !closed && planFilesIterator.hasNext();
updateCurrentIterator();
return !closed && currentIterator.hasNext();
}

@Override
public List<Object> next()
{
return getRecord(planFilesIterator.next().file());
updateCurrentIterator();
return getRecord(currentIterator.next());
}

private void updateCurrentIterator()
{
try {
while (!closed && !currentIterator.hasNext() && planFilesIterator.hasNext()) {
currentIterator.close();
DataTask dataTask = (DataTask) planFilesIterator.next();
currentIterator = dataTask.rows().iterator();
}
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void close()
throws IOException
{
PlanFilesIterable.super.close();
currentIterator.close();
FilesTable.PlanFilesIterable.super.close();
closed = true;
}
};
}

private List<Object> getRecord(DataFile dataFile)
private List<Object> getRecord(StructLike structLike)
{
List<Object> columns = new ArrayList<>();
columns.add(dataFile.content().id());
columns.add(dataFile.path().toString());
columns.add(dataFile.format().name());
columns.add(dataFile.recordCount());
columns.add(dataFile.fileSizeInBytes());
columns.add(getIntegerBigintSqlMap(dataFile.columnSizes()));
columns.add(getIntegerBigintSqlMap(dataFile.valueCounts()));
columns.add(getIntegerBigintSqlMap(dataFile.nullValueCounts()));
columns.add(getIntegerBigintSqlMap(dataFile.nanValueCounts()));
columns.add(getIntegerVarcharSqlMap(dataFile.lowerBounds()));
columns.add(getIntegerVarcharSqlMap(dataFile.upperBounds()));
columns.add(toVarbinarySlice(dataFile.keyMetadata()));
columns.add(toBigintArrayBlock(dataFile.splitOffsets()));
columns.add(toIntegerArrayBlock(dataFile.equalityFieldIds()));
columns.add(structLike.get(columnNameToPosition.get(CONTENT_COLUMN_NAME), Integer.class));
columns.add(structLike.get(columnNameToPosition.get(FILE_PATH_COLUMN_NAME), String.class));
columns.add(structLike.get(columnNameToPosition.get(FILE_FORMAT_COLUMN_NAME), String.class));
columns.add(structLike.get(columnNameToPosition.get(RECORD_COUNT_COLUMN_NAME), Long.class));
columns.add(structLike.get(columnNameToPosition.get(FILE_SIZE_IN_BYTES_COLUMN_NAME), Long.class));
columns.add(getIntegerBigintSqlMap(structLike.get(columnNameToPosition.get(COLUMN_SIZES_COLUMN_NAME), Map.class)));
columns.add(getIntegerBigintSqlMap(structLike.get(columnNameToPosition.get(VALUE_COUNTS_COLUMN_NAME), Map.class)));
columns.add(getIntegerBigintSqlMap(structLike.get(columnNameToPosition.get(NULL_VALUE_COUNTS_COLUMN_NAME), Map.class)));
columns.add(getIntegerBigintSqlMap(structLike.get(columnNameToPosition.get(NAN_VALUE_COUNTS_COLUMN_NAME), Map.class)));
columns.add(getIntegerVarcharSqlMap(structLike.get(columnNameToPosition.get(LOWER_BOUNDS_COLUMN_NAME), Map.class)));
columns.add(getIntegerVarcharSqlMap(structLike.get(columnNameToPosition.get(UPPER_BOUNDS_COLUMN_NAME), Map.class)));
columns.add(toVarbinarySlice(structLike.get(columnNameToPosition.get(KEY_METADATA_COLUMN_NAME), ByteBuffer.class)));
columns.add(toBigintArrayBlock(structLike.get(columnNameToPosition.get(SPLIT_OFFSETS_COLUMN_NAME), List.class)));
columns.add(toIntegerArrayBlock(structLike.get(columnNameToPosition.get(EQUALITY_IDS_COLUMN_NAME), List.class)));
checkArgument(columns.size() == types.size(), "Expected %s types in row, but got %s values", types.size(), columns.size());
return columns;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1584,7 +1584,7 @@ public void testUpdateWithSortOrder()
assertQuery(
"SELECT custkey, name, address, nationkey, phone, acctbal, mktsegment, comment FROM " + table.getName(),
"SELECT custkey, name, address, nationkey, phone, acctbal, mktsegment, substring(comment, 2) FROM customer");
for (Object filePath : computeActual("SELECT file_path from \"" + table.getName() + "$files\"").getOnlyColumnAsSet()) {
for (Object filePath : computeActual("SELECT file_path from \"" + table.getName() + "$files\" where content != 1").getOnlyColumnAsSet()) {
assertThat(isFileSorted((String) filePath, "comment")).isTrue();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.testing.MaterializedResult;
import io.trino.testing.MaterializedRow;
import io.trino.testing.QueryRunner;
import org.apache.iceberg.FileContent;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -96,6 +97,13 @@ public void setUp()
assertUpdate("DELETE FROM test_schema.test_table_with_dml WHERE _date = DATE '2022-02-02' AND _varchar = 'b2'", 1);
assertUpdate("INSERT INTO test_schema.test_table_with_dml VALUES ('c3', DATE '2022-03-03'), ('d1', DATE '2022-04-04')", 2);
assertQuery("SELECT count(*) FROM test_schema.test_table_with_dml", "VALUES 7");

assertUpdate("CREATE TABLE test_schema.test_table_with_delete (_bigint BIGINT, _date DATE) WITH (partitioning = ARRAY['_date'])");
assertUpdate("INSERT INTO test_schema.test_table_with_delete VALUES (0, CAST('2019-09-08' AS DATE)), (1, CAST('2019-09-09' AS DATE)), (2, CAST('2019-09-09' AS DATE))", 3);
assertUpdate("INSERT INTO test_schema.test_table_with_delete VALUES (3, CAST('2019-09-09' AS DATE)), (4, CAST('2019-09-10' AS DATE)), (5, CAST('2019-09-10' AS DATE))", 3);
assertUpdate("DELETE FROM test_schema.test_table_with_delete WHERE _bigint = 5", 1);
assertUpdate("DELETE FROM test_schema.test_table_with_delete WHERE _bigint = 2", 1);
assertQuery("SELECT count(*) FROM test_schema.test_table_with_delete", "VALUES 4");
}

@AfterAll
Expand All @@ -107,6 +115,7 @@ public void tearDown()
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_nan");
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_with_dml");
assertUpdate("DROP TABLE IF EXISTS test_schema.test_metadata_log_entries");
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_with_delete");
assertUpdate("DROP SCHEMA IF EXISTS test_schema");
}

Expand Down Expand Up @@ -363,6 +372,14 @@ public void testFilesTable()
.build());
}

@Test
public void testFilesTableWithDelete()
{
assertQuery("SELECT count(*) FROM test_schema.\"test_table_with_delete$files\" WHERE content = " + FileContent.DATA.id(), "VALUES 4");
assertQuery("SELECT count(*) FROM test_schema.\"test_table_with_delete$files\" WHERE content = " + FileContent.POSITION_DELETES.id(), "VALUES 2");
assertQuery("SELECT count(*) FROM test_schema.\"test_table_with_delete$files\" WHERE content = " + FileContent.EQUALITY_DELETES.id(), "VALUES 0");
}

private Long nanCount(long value)
{
// Parquet does not have nan count metrics
Expand Down
Loading

0 comments on commit 3f19cc4

Please sign in to comment.