Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.TypeSignature;
import jakarta.annotation.Nullable;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetricsUtil.ReadableMetricsStruct;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Table;
Expand All @@ -43,6 +44,7 @@
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.Slices.wrappedHeapBuffer;
import static io.trino.plugin.iceberg.FilesTable.getIcebergIdToTypeMapping;
Expand All @@ -59,8 +61,10 @@
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES;
import static org.apache.iceberg.MetadataTableType.ENTRIES;

// https://iceberg.apache.org/docs/latest/spark-queries/#all-entries
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get why the same table would be needed under a different name.
Could you please elaborate on the need for it ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$entries returns the entries of the current snapshot. $all_entries returns entries for all snapshots.
$all_entries covers $entries, but $entries allows inspecting the current status without filtering.

// https://iceberg.apache.org/docs/latest/spark-queries/#entries
public class EntriesTable
extends BaseSystemTable
Expand All @@ -70,15 +74,16 @@ public class EntriesTable
private final Optional<IcebergPartitionColumn> partitionColumn;
private final List<Type> partitionTypes;

public EntriesTable(TypeManager typeManager, SchemaTableName tableName, Table icebergTable, ExecutorService executor)
public EntriesTable(TypeManager typeManager, SchemaTableName tableName, Table icebergTable, MetadataTableType metadataTableType, ExecutorService executor)
{
super(
requireNonNull(icebergTable, "icebergTable is null"),
new ConnectorTableMetadata(
requireNonNull(tableName, "tableName is null"),
columns(requireNonNull(typeManager, "typeManager is null"), icebergTable)),
ENTRIES,
metadataTableType,
executor);
checkArgument(metadataTableType == ALL_ENTRIES || metadataTableType == ENTRIES, "Unexpected metadata table type: %s", metadataTableType);
idToTypeMapping = getIcebergIdToTypeMapping(icebergTable.schema());
primitiveFields = IcebergUtil.primitiveFields(icebergTable.schema()).stream()
.sorted(Comparator.comparing(Types.NestedField::name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.joining;
import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES;
import static org.apache.iceberg.MetadataTableType.ENTRIES;
import static org.apache.iceberg.ReachableFileUtil.metadataFileLocations;
import static org.apache.iceberg.ReachableFileUtil.statisticsFilesLocations;
import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP;
Expand Down Expand Up @@ -707,7 +709,8 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
case ALL_MANIFESTS -> Optional.of(new AllManifestsTable(tableName, table, icebergScanExecutor));
case MANIFESTS -> Optional.of(new ManifestsTable(tableName, table, getCurrentSnapshotId(table)));
case FILES -> Optional.of(new FilesTable(tableName, typeManager, table, getCurrentSnapshotId(table), icebergScanExecutor));
case ENTRIES -> Optional.of(new EntriesTable(typeManager, tableName, table, icebergScanExecutor));
case ALL_ENTRIES -> Optional.of(new EntriesTable(typeManager, tableName, table, ALL_ENTRIES, icebergScanExecutor));
case ENTRIES -> Optional.of(new EntriesTable(typeManager, tableName, table, ENTRIES, icebergScanExecutor));
case PROPERTIES -> Optional.of(new PropertiesTable(tableName, table));
case REFS -> Optional.of(new RefsTable(tableName, table, icebergScanExecutor));
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public enum TableType
MANIFESTS,
PARTITIONS,
FILES,
ALL_ENTRIES,
ENTRIES,
PROPERTIES,
REFS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,27 @@ public void testFilesTableWithDelete()
assertUpdate("DROP TABLE IF EXISTS test_schema.test_table_with_delete");
}

@Test
void testAllEntriesTable()
{
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_all_entries", "AS SELECT 1 id, DATE '2014-01-01' dt")) {
assertThat(query("DESCRIBE \"" + table.getName() + "$all_entries\""))
.matches("DESCRIBE \"" + table.getName() + "$entries\"");

assertThat(query("SELECT * FROM \"" + table.getName() + "$all_entries\""))
.matches("SELECT * FROM \"" + table.getName() + "$entries\"");

assertUpdate("DELETE FROM " + table.getName(), 1);

assertThat(computeActual("SELECT status FROM \"" + table.getName() + "$all_entries\"").getOnlyColumnAsSet())
.containsExactly(1, 2);
assertThat(computeActual("SELECT status FROM \"" + table.getName() + "$entries\"").getOnlyColumnAsSet())
.containsExactly(2);
assertThat(query("SELECT * FROM \"" + table.getName() + "$all_entries\" WHERE status = 2"))
.matches("SELECT * FROM \"" + table.getName() + "$entries\"");
}
}

@Test
void testEntriesTable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static io.trino.plugin.hive.metastore.MetastoreMethod.GET_TABLES;
import static io.trino.plugin.hive.metastore.MetastoreMethod.REPLACE_TABLE;
import static io.trino.plugin.iceberg.IcebergSessionProperties.COLLECT_EXTENDED_STATISTICS_ON_WRITE;
import static io.trino.plugin.iceberg.TableType.ALL_ENTRIES;
import static io.trino.plugin.iceberg.TableType.ALL_MANIFESTS;
import static io.trino.plugin.iceberg.TableType.DATA;
import static io.trino.plugin.iceberg.TableType.ENTRIES;
Expand Down Expand Up @@ -339,6 +340,12 @@ public void testSelectSystemTable()
.addCopies(GET_TABLE, 1)
.build());

// select from $all_entries
assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$all_entries\"",
ImmutableMultiset.<MetastoreMethod>builder()
.addCopies(GET_TABLE, 1)
.build());

// select from $entries
assertMetastoreInvocations("SELECT * FROM \"test_select_snapshots$entries\"",
ImmutableMultiset.<MetastoreMethod>builder()
Expand All @@ -356,7 +363,7 @@ public void testSelectSystemTable()

// This test should get updated if a new system table is added.
assertThat(TableType.values())
.containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, ALL_MANIFESTS, MANIFESTS, PARTITIONS, FILES, ENTRIES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
.containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, ALL_MANIFESTS, MANIFESTS, PARTITIONS, FILES, ALL_ENTRIES, ENTRIES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static io.trino.plugin.hive.metastore.glue.GlueMetastoreMethod.GET_TABLES;
import static io.trino.plugin.hive.metastore.glue.GlueMetastoreMethod.UPDATE_TABLE;
import static io.trino.plugin.iceberg.IcebergSessionProperties.COLLECT_EXTENDED_STATISTICS_ON_WRITE;
import static io.trino.plugin.iceberg.TableType.ALL_ENTRIES;
import static io.trino.plugin.iceberg.TableType.ALL_MANIFESTS;
import static io.trino.plugin.iceberg.TableType.DATA;
import static io.trino.plugin.iceberg.TableType.ENTRIES;
Expand Down Expand Up @@ -471,6 +472,12 @@ public void testSelectSystemTable()
.add(GET_TABLE)
.build());

// select from $all_entries
assertGlueMetastoreApiInvocations("SELECT * FROM \"test_select_snapshots$all_entries\"",
ImmutableMultiset.<GlueMetastoreMethod>builder()
.add(GET_TABLE)
.build());

// select from $entries
assertGlueMetastoreApiInvocations("SELECT * FROM \"test_select_snapshots$entries\"",
ImmutableMultiset.<GlueMetastoreMethod>builder()
Expand All @@ -494,7 +501,7 @@ public void testSelectSystemTable()

// This test should get updated if a new system table is added.
assertThat(TableType.values())
.containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, ALL_MANIFESTS, MANIFESTS, PARTITIONS, FILES, ENTRIES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
.containsExactly(DATA, HISTORY, METADATA_LOG_ENTRIES, SNAPSHOTS, ALL_MANIFESTS, MANIFESTS, PARTITIONS, FILES, ALL_ENTRIES, ENTRIES, PROPERTIES, REFS, MATERIALIZED_VIEW_STORAGE);
}
finally {
getQueryRunner().execute("DROP TABLE IF EXISTS test_select_snapshots");
Expand Down
Loading