Skip to content

Commit

Permalink
Show delete content in $files metadata table
Browse files Browse the repository at this point in the history
  • Loading branch information
0xffmeta committed Apr 19, 2024
1 parent 12b8e5f commit c5c856d
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
import io.trino.spi.type.MapType;
import io.trino.spi.type.TypeManager;
import jakarta.annotation.Nullable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableGroup;
Expand All @@ -48,6 +50,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -64,6 +67,8 @@
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataTableType.FILES;
import static org.apache.iceberg.MetadataTableUtils.createMetadataTableInstance;

public class FilesTable
implements SystemTable
Expand Down Expand Up @@ -121,7 +126,8 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
}

Map<Integer, Type> idToTypeMapping = getIcebergIdToTypeMapping(icebergTable.schema());
TableScan tableScan = icebergTable.newScan()
TableScan tableScan = createMetadataTableInstance(icebergTable, FILES)
.newScan()
.useSnapshot(snapshotId.get())
.includeColumnStats();

Expand Down Expand Up @@ -174,45 +180,85 @@ public CloseableIterator<List<Object>> iterator()
addCloseable(planFilesIterator);

return new CloseableIterator<>() {
private CloseableIterator<StructLike> currentIterator = CloseableIterator.empty();

@Override
public boolean hasNext()
{
return !closed && planFilesIterator.hasNext();
updateCurrentIterator();
return !closed && currentIterator.hasNext();
}

@Override
public List<Object> next()
{
return getRecord(planFilesIterator.next().file());
updateCurrentIterator();

StructLike dataTaskFile = currentIterator.next();

StructLike innerContentFile = null;

// the StructWithReadableMetrics is a private class in iceberg-core
// so we need to use reflection to access it
try {
Class<?> clazz = dataTaskFile.getClass();
Field field = clazz.getDeclaredField("struct");
field.setAccessible(true);
innerContentFile = (StructLike) field.get(dataTaskFile);
} catch (NoSuchFieldException | IllegalAccessException e)
{
throw new RuntimeException("Failed to access field struct from " + dataTaskFile.getClass().getName());
}

if (innerContentFile instanceof ContentFile contentFile) {
return getRecord(contentFile);
}
else {
throw new IllegalArgumentException("Unknown ContentFile type for " + dataTaskFile.getClass().getName());
}
}

private void updateCurrentIterator()
{
try {
while (!closed && !currentIterator.hasNext() && planFilesIterator.hasNext()) {
currentIterator.close();
DataTask dataTask = (DataTask) planFilesIterator.next();
currentIterator = dataTask.rows().iterator();
}
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void close()
throws IOException
{
PlanFilesIterable.super.close();
FilesTable.PlanFilesIterable.super.close();
closed = true;
}
};
}

private List<Object> getRecord(DataFile dataFile)
private List<Object> getRecord(ContentFile<?> contentFile)
{
List<Object> columns = new ArrayList<>();
columns.add(dataFile.content().id());
columns.add(dataFile.path().toString());
columns.add(dataFile.format().name());
columns.add(dataFile.recordCount());
columns.add(dataFile.fileSizeInBytes());
columns.add(getIntegerBigintSqlMap(dataFile.columnSizes()));
columns.add(getIntegerBigintSqlMap(dataFile.valueCounts()));
columns.add(getIntegerBigintSqlMap(dataFile.nullValueCounts()));
columns.add(getIntegerBigintSqlMap(dataFile.nanValueCounts()));
columns.add(getIntegerVarcharSqlMap(dataFile.lowerBounds()));
columns.add(getIntegerVarcharSqlMap(dataFile.upperBounds()));
columns.add(toVarbinarySlice(dataFile.keyMetadata()));
columns.add(toBigintArrayBlock(dataFile.splitOffsets()));
columns.add(toIntegerArrayBlock(dataFile.equalityFieldIds()));
columns.add(contentFile.content().id());
columns.add(contentFile.path().toString());
columns.add(contentFile.format().name());
columns.add(contentFile.recordCount());
columns.add(contentFile.fileSizeInBytes());
columns.add(getIntegerBigintSqlMap(contentFile.columnSizes()));
columns.add(getIntegerBigintSqlMap(contentFile.valueCounts()));
columns.add(getIntegerBigintSqlMap(contentFile.nullValueCounts()));
columns.add(getIntegerBigintSqlMap(contentFile.nanValueCounts()));
columns.add(getIntegerVarcharSqlMap(contentFile.lowerBounds()));
columns.add(getIntegerVarcharSqlMap(contentFile.upperBounds()));
columns.add(toVarbinarySlice(contentFile.keyMetadata()));
columns.add(toBigintArrayBlock(contentFile.splitOffsets()));
columns.add(toIntegerArrayBlock(contentFile.equalityFieldIds()));
checkArgument(columns.size() == types.size(), "Expected %s types in row, but got %s values", types.size(), columns.size());
return columns;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.trino.testing.MaterializedResult;
import io.trino.testing.MaterializedRow;
import io.trino.testing.QueryRunner;
import org.apache.iceberg.FileContent;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -92,6 +93,13 @@ public void setUp()
assertUpdate("DELETE FROM test_schema.test_table_with_dml WHERE _date = DATE '2022-02-02' AND _varchar = 'b2'", 1);
assertUpdate("INSERT INTO test_schema.test_table_with_dml VALUES ('c3', DATE '2022-03-03'), ('d1', DATE '2022-04-04')", 2);
assertQuery("SELECT count(*) FROM test_schema.test_table_with_dml", "VALUES 7");

assertUpdate("CREATE TABLE test_schema.test_table_with_delete (_bigint BIGINT, _date DATE) WITH (partitioning = ARRAY['_date'])");
assertUpdate("INSERT INTO test_schema.test_table_with_delete VALUES (0, CAST('2019-09-08' AS DATE)), (1, CAST('2019-09-09' AS DATE)), (2, CAST('2019-09-09' AS DATE))", 3);
assertUpdate("INSERT INTO test_schema.test_table_with_delete VALUES (3, CAST('2019-09-09' AS DATE)), (4, CAST('2019-09-10' AS DATE)), (5, CAST('2019-09-10' AS DATE))", 3);
assertUpdate("DELETE FROM test_schema.test_table_with_delete WHERE _bigint = 5", 1);
assertUpdate("DELETE FROM test_schema.test_table_with_delete WHERE _bigint = 2", 1);
assertQuery("SELECT count(*) FROM test_schema.test_table_with_delete", "VALUES 4");
}

@AfterAll
Expand All @@ -103,6 +111,7 @@ public void tearDown()
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_nan");
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_with_dml");
assertUpdate("DROP TABLE IF EXISTS test_schema.test_metadata_log_entries");
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_with_delete");
assertUpdate("DROP SCHEMA IF EXISTS test_schema");
}

Expand Down Expand Up @@ -350,6 +359,14 @@ public void testFilesTable()
assertQuerySucceeds("SELECT * FROM test_schema.\"test_table$files\"");
}

@Test
public void testFilesTableWithDelete()
{
assertQuery("SELECT count(*) FROM test_schema.\"test_table_with_delete$files\" WHERE content = " + FileContent.DATA.id(), "VALUES 4");
assertQuery("SELECT count(*) FROM test_schema.\"test_table_with_delete$files\" WHERE content = " + FileContent.POSITION_DELETES.id(), "VALUES 2");
assertQuery("SELECT count(*) FROM test_schema.\"test_table_with_delete$files\" WHERE content = " + FileContent.EQUALITY_DELETES.id(), "VALUES 0");
}

private Long nanCount(long value)
{
// Parquet does not have nan count metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -208,6 +209,11 @@ public void testV2TableWithEqualityDelete()
assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE regionkey != 1");
// nationkey is before the equality delete column in the table schema, comment is after
assertQuery("SELECT nationkey, comment FROM " + tableName, "SELECT nationkey, comment FROM nation WHERE regionkey != 1");

assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation", 25);
writeEqualityDeleteToNationTable(icebergTable, Optional.of(icebergTable.spec()), Optional.of(new PartitionData(new Long[]{2L})), ImmutableMap.of("regionkey", 2L));
// the equality delete file is applied to 2 data files
assertQuery("SELECT count(*) FROM \"" + tableName + "$files\" WHERE content = " + FileContent.EQUALITY_DELETES.id(), "VALUES 2");
}

@Test
Expand Down Expand Up @@ -762,8 +768,6 @@ public void testFilesTable()
.withEncryptionKeyMetadata(ByteBuffer.wrap("Trino".getBytes(UTF_8)))
.build();
table.newAppend().appendFile(dataFile).commit();
// TODO Currently, Trino does not include equality delete files stats in the $files table.
// Once it is fixed by https://github.com/trinodb/trino/pull/16232, include equality delete output in the test.
writeEqualityDeleteToNationTable(table);
assertQuery(
"SELECT " +
Expand Down Expand Up @@ -805,7 +809,19 @@ public void testFilesTable()
JSON '{"1":"4"}',
X'54 72 69 6e 6f',
ARRAY[4L],
null)
null),
(2,
'PARQUET',
1L,
JSON '{"3":46}',
JSON '{"3":1}',
JSON '{"3":0}',
JSON '{}',
JSON '{"3":"1"}',
JSON '{"3":"1"}',
null,
ARRAY[4],
ARRAY[3])
""");
}

Expand Down Expand Up @@ -1021,7 +1037,7 @@ private BaseTable loadTable(String tableName)

private List<String> getActiveFiles(String tableName)
{
return computeActual(format("SELECT file_path FROM \"%s$files\"", tableName)).getOnlyColumn()
return computeActual(format("SELECT file_path FROM \"%s$files\" WHERE content = %d", tableName, FileContent.DATA.id())).getOnlyColumn()
.map(String.class::cast)
.collect(toImmutableList());
}
Expand Down

0 comments on commit c5c856d

Please sign in to comment.