Skip to content

Commit c9c1654

Browse files
committed
Address PR comments - will rebase and clean commits after all feedback is addressed
1 parent 359f634 commit c9c1654

File tree

13 files changed

+402
-226
lines changed

13 files changed

+402
-226
lines changed

core/trino-main/src/main/java/io/trino/connector/DefaultCatalogFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.opentelemetry.api.OpenTelemetry;
2020
import io.opentelemetry.api.trace.Tracer;
2121
import io.trino.connector.informationschema.InformationSchemaConnector;
22-
import io.trino.connector.system.DefaultSystemTablesProvider;
2322
import io.trino.connector.system.SystemConnector;
2423
import io.trino.connector.system.SystemTablesProvider;
2524
import io.trino.execution.scheduler.NodeSchedulerConfig;
@@ -155,7 +154,7 @@ private CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorNam
155154
accessControl,
156155
maxPrefetchedInformationSchemaPrefixes));
157156

158-
SystemTablesProvider systemTablesProvider = new DefaultSystemTablesProvider(
157+
SystemTablesProvider systemTablesProvider = new SystemTablesProvider(
159158
transactionManager,
160159
metadata,
161160
catalogHandle.getCatalogName().toString(),
@@ -170,7 +169,8 @@ private CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorNam
170169
systemTablesProvider,
171170
transactionId -> transactionManager.getConnectorTransaction(transactionId, catalogHandle),
172171
accessControl,
173-
catalogHandle.getCatalogName().toString(), catalogConnector.getPageSourceProviderFactory()));
172+
catalogHandle.getCatalogName().toString(),
173+
catalogConnector.getPageSourceProviderFactory()));
174174

175175
return new CatalogConnector(
176176
catalogHandle,

core/trino-main/src/main/java/io/trino/connector/system/DefaultSystemTablesProvider.java

Lines changed: 0 additions & 94 deletions
This file was deleted.

core/trino-main/src/main/java/io/trino/connector/system/SystemPageSourceProvider.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,15 @@ public class SystemPageSourceProvider
6363
private final SystemTablesProvider tables;
6464
private final AccessControl accessControl;
6565
private final String catalogName;
66-
private final Optional<ConnectorPageSourceProviderFactory> pageSourceProviderFactory;
66+
private final Optional<ConnectorPageSourceProvider> connectorPageSourceProvider;
6767

6868
public SystemPageSourceProvider(SystemTablesProvider tables, AccessControl accessControl, String catalogName, Optional<ConnectorPageSourceProviderFactory> pageSourceProviderFactory)
6969
{
7070
this.tables = requireNonNull(tables, "tables is null");
7171
this.accessControl = requireNonNull(accessControl, "accessControl is null");
7272
this.catalogName = requireNonNull(catalogName, "catalogName is null");
73-
this.pageSourceProviderFactory = requireNonNull(pageSourceProviderFactory, "pageSourceProviderFactory is null");
73+
this.connectorPageSourceProvider = requireNonNull(pageSourceProviderFactory, "pageSourceProviderFactory is null")
74+
.map(ConnectorPageSourceProviderFactory::createPageSourceProvider);
7475
}
7576

7677
@Override
@@ -87,7 +88,7 @@ public ConnectorPageSource createPageSource(
8788

8889
// if the split is not a SystemSplit, we immediately delegate to the Connector to build a PageSource
8990
if (!(split instanceof SystemSplit systemSplit)) {
90-
return pageSourceProviderFactory.orElseThrow().createPageSourceProvider()
91+
return connectorPageSourceProvider.orElseThrow()
9192
.createPageSource(systemTransaction.getConnectorTransactionHandle(), session, split, table, columns, dynamicFilter);
9293
}
9394

core/trino-main/src/main/java/io/trino/connector/system/SystemTablesProvider.java

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,78 @@
1313
*/
1414
package io.trino.connector.system;
1515

16+
import io.trino.FullConnectorSession;
17+
import io.trino.Session;
18+
import io.trino.metadata.Metadata;
19+
import io.trino.metadata.QualifiedObjectName;
1620
import io.trino.spi.connector.ConnectorSession;
1721
import io.trino.spi.connector.SchemaTableName;
1822
import io.trino.spi.connector.SystemTable;
23+
import io.trino.transaction.TransactionManager;
1924

25+
import java.util.Map;
2026
import java.util.Optional;
2127
import java.util.Set;
2228

23-
public interface SystemTablesProvider
29+
import static com.google.common.collect.ImmutableMap.toImmutableMap;
30+
import static java.util.Objects.requireNonNull;
31+
import static java.util.function.Function.identity;
32+
33+
public class SystemTablesProvider
2434
{
25-
Set<SystemTable> listSystemTables(ConnectorSession session);
26-
27-
/**
28-
* Resolves table name. Returns {@link Optional#empty()} if table is not found.
29-
* Some tables which are not part of set returned by {@link #listSystemTables(ConnectorSession)}
30-
* can still be validly resolved.
31-
*/
32-
Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTableName tableName);
35+
private final TransactionManager transactionManager;
36+
private final Metadata metadata;
37+
private final String catalogName;
38+
private final Set<SystemTable> systemTables;
39+
private final Map<SchemaTableName, SystemTable> systemTablesMap;
40+
41+
public SystemTablesProvider(TransactionManager transactionManager, Metadata metadata, String catalogName, Set<SystemTable> systemTables)
42+
{
43+
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
44+
this.metadata = requireNonNull(metadata, "metadata is null");
45+
this.catalogName = requireNonNull(catalogName, "catalogName is null");
46+
this.systemTables = requireNonNull(systemTables, "systemTables is null");
47+
this.systemTablesMap = systemTables.stream()
48+
.collect(toImmutableMap(
49+
table -> table.getTableMetadata().getTable(),
50+
identity()));
51+
}
52+
53+
public Set<SystemTable> listSystemTables(ConnectorSession session)
54+
{
55+
// dynamic are not listed, so ony list static tables
56+
return systemTables;
57+
}
58+
59+
public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTableName tableName)
60+
{
61+
Optional<SystemTable> staticSystemTable = Optional.ofNullable(systemTablesMap.get(tableName));
62+
if (staticSystemTable.isPresent()) {
63+
return staticSystemTable;
64+
}
65+
66+
// This means there is no known static table, but that doesn't mean a dynamic table must exist.
67+
// This node could have a different config that causes that table to not exist.
68+
if (!isCoordinatorTransaction(session)) {
69+
// this is a session from another coordinator, so there are no dynamic tables here for that session
70+
return Optional.empty();
71+
}
72+
73+
// dynamic tables that are not SINGLE_COORDINATOR mode need to implement the
74+
// PageSourceProvider interface for SystemSplit through the Connector interface
75+
return metadata.getSystemTable(
76+
((FullConnectorSession) session).getSession(),
77+
new QualifiedObjectName(catalogName, tableName.getSchemaName(), tableName.getTableName()));
78+
}
79+
80+
private boolean isCoordinatorTransaction(ConnectorSession connectorSession)
81+
{
82+
return Optional.of(connectorSession)
83+
.filter(FullConnectorSession.class::isInstance)
84+
.map(FullConnectorSession.class::cast)
85+
.map(FullConnectorSession::getSession)
86+
.flatMap(Session::getTransactionId)
87+
.map(transactionManager::transactionExists)
88+
.orElse(false);
89+
}
3390
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -784,7 +784,7 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
784784
case PARTITIONS -> Optional.of(new PartitionsTable(tableName, typeManager, table, getCurrentSnapshotId(table), icebergScanExecutor));
785785
case ALL_MANIFESTS -> Optional.of(new AllManifestsTable(tableName, table, icebergScanExecutor));
786786
case MANIFESTS -> Optional.of(new ManifestsTable(tableName, table, getCurrentSnapshotId(table)));
787-
case FILES -> Optional.of(new FilesTable(tableName, typeManager, table, getCurrentSnapshotId(table), icebergScanExecutor));
787+
case FILES -> Optional.of(new FilesTable(tableName, typeManager, table, getCurrentSnapshotId(table)));
788788
case ALL_ENTRIES -> Optional.of(new EntriesTable(typeManager, tableName, table, ALL_ENTRIES, icebergScanExecutor));
789789
case ENTRIES -> Optional.of(new EntriesTable(typeManager, tableName, table, ENTRIES, icebergScanExecutor));
790790
case PROPERTIES -> Optional.of(new PropertiesTable(tableName, table));

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,9 @@ public ConnectorPageSource createPageSource(
246246
if (connectorSplit instanceof FilesTableSplit filesTableSplit) {
247247
return new FilesTablePageSource(
248248
typeManager,
249-
fileSystemFactory.create(session.getIdentity(), filesTableSplit.fileSystemProperties()),
249+
fileSystemFactory.create(session.getIdentity(), filesTableSplit.fileIoProperties()),
250250
fileIoFactory,
251-
columns.stream().map(SystemColumnHandle.class::cast).map(SystemColumnHandle::columnName).toList(),
251+
columns.stream().map(SystemColumnHandle.class::cast).map(SystemColumnHandle::columnName).collect(toImmutableList()),
252252
filesTableSplit);
253253
}
254254

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionsTable.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -154,16 +154,7 @@ public static List<PartitionField> getAllPartitionFields(Schema schema, Map<Inte
154154

155155
public static List<PartitionField> getAllPartitionFields(Table icebergTable)
156156
{
157-
Set<Integer> existingColumnsIds = TypeUtil.indexById(icebergTable.schema().asStruct()).keySet();
158-
159-
List<PartitionField> visiblePartitionFields = icebergTable.specs()
160-
.values().stream()
161-
.flatMap(partitionSpec -> partitionSpec.fields().stream())
162-
// skip columns that were dropped
163-
.filter(partitionField -> existingColumnsIds.contains(partitionField.sourceId()))
164-
.collect(toImmutableList());
165-
166-
return filterOutDuplicates(visiblePartitionFields);
157+
return getAllPartitionFields(icebergTable.schema(), icebergTable.specs());
167158
}
168159

169160
private static List<PartitionField> filterOutDuplicates(List<PartitionField> visiblePartitionFields)

0 commit comments

Comments
 (0)