Skip to content

Commit 529d39b

Browse files
committed
Address PR comments - will rebase and clean commits after all feedback is addressed
1 parent 7b27893 commit 529d39b

File tree

14 files changed

+502
-226
lines changed

14 files changed

+502
-226
lines changed

core/trino-main/src/main/java/io/trino/connector/DefaultCatalogFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.opentelemetry.api.OpenTelemetry;
2020
import io.opentelemetry.api.trace.Tracer;
2121
import io.trino.connector.informationschema.InformationSchemaConnector;
22-
import io.trino.connector.system.DefaultSystemTablesProvider;
2322
import io.trino.connector.system.SystemConnector;
2423
import io.trino.connector.system.SystemTablesProvider;
2524
import io.trino.execution.scheduler.NodeSchedulerConfig;
@@ -155,7 +154,7 @@ private CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorNam
155154
accessControl,
156155
maxPrefetchedInformationSchemaPrefixes));
157156

158-
SystemTablesProvider systemTablesProvider = new DefaultSystemTablesProvider(
157+
SystemTablesProvider systemTablesProvider = new SystemTablesProvider(
159158
transactionManager,
160159
metadata,
161160
catalogHandle.getCatalogName().toString(),
@@ -170,7 +169,8 @@ private CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorNam
170169
systemTablesProvider,
171170
transactionId -> transactionManager.getConnectorTransaction(transactionId, catalogHandle),
172171
accessControl,
173-
catalogHandle.getCatalogName().toString(), catalogConnector.getPageSourceProviderFactory()));
172+
catalogHandle.getCatalogName().toString(),
173+
catalogConnector.getPageSourceProviderFactory()));
174174

175175
return new CatalogConnector(
176176
catalogHandle,

core/trino-main/src/main/java/io/trino/connector/system/DefaultSystemTablesProvider.java

Lines changed: 0 additions & 94 deletions
This file was deleted.

core/trino-main/src/main/java/io/trino/connector/system/SystemPageSourceProvider.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,15 @@ public class SystemPageSourceProvider
6363
private final SystemTablesProvider tables;
6464
private final AccessControl accessControl;
6565
private final String catalogName;
66-
private final Optional<ConnectorPageSourceProviderFactory> pageSourceProviderFactory;
66+
private final Optional<ConnectorPageSourceProvider> connectorPageSourceProvider;
6767

6868
public SystemPageSourceProvider(SystemTablesProvider tables, AccessControl accessControl, String catalogName, Optional<ConnectorPageSourceProviderFactory> pageSourceProviderFactory)
6969
{
7070
this.tables = requireNonNull(tables, "tables is null");
7171
this.accessControl = requireNonNull(accessControl, "accessControl is null");
7272
this.catalogName = requireNonNull(catalogName, "catalogName is null");
73-
this.pageSourceProviderFactory = requireNonNull(pageSourceProviderFactory, "pageSourceProviderFactory is null");
73+
this.connectorPageSourceProvider = requireNonNull(pageSourceProviderFactory, "pageSourceProviderFactory is null")
74+
.map(ConnectorPageSourceProviderFactory::createPageSourceProvider);
7475
}
7576

7677
@Override
@@ -87,7 +88,7 @@ public ConnectorPageSource createPageSource(
8788

8889
// if the split is not a SystemSplit, we immediately delegate to the Connector to build a PageSource
8990
if (!(split instanceof SystemSplit systemSplit)) {
90-
return pageSourceProviderFactory.orElseThrow().createPageSourceProvider()
91+
return connectorPageSourceProvider.orElseThrow()
9192
.createPageSource(systemTransaction.getConnectorTransactionHandle(), session, split, table, columns, dynamicFilter);
9293
}
9394

core/trino-main/src/main/java/io/trino/connector/system/SystemTablesProvider.java

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,81 @@
1111
* See the License for the specific language governing permissions and
1212
* limitations under the License.
1313
*/
14+
1415
package io.trino.connector.system;
1516

17+
import io.trino.FullConnectorSession;
18+
import io.trino.Session;
19+
import io.trino.metadata.Metadata;
20+
import io.trino.metadata.QualifiedObjectName;
1621
import io.trino.spi.connector.ConnectorSession;
1722
import io.trino.spi.connector.SchemaTableName;
1823
import io.trino.spi.connector.SystemTable;
24+
import io.trino.transaction.TransactionManager;
1925

26+
import java.util.Map;
2027
import java.util.Optional;
2128
import java.util.Set;
2229

23-
public interface SystemTablesProvider
30+
import static com.google.common.collect.ImmutableMap.toImmutableMap;
31+
import static java.util.Objects.requireNonNull;
32+
import static java.util.function.Function.identity;
33+
34+
public class SystemTablesProvider
2435
{
25-
Set<SystemTable> listSystemTables(ConnectorSession session);
26-
27-
/**
28-
* Resolves table name. Returns {@link Optional#empty()} if table is not found.
29-
* Some tables which are not part of set returned by {@link #listSystemTables(ConnectorSession)}
30-
* can still be validly resolved.
31-
*/
32-
Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTableName tableName);
36+
private final TransactionManager transactionManager;
37+
private final Metadata metadata;
38+
private final String catalogName;
39+
private final Set<SystemTable> systemTables;
40+
private final Map<SchemaTableName, SystemTable> systemTablesMap;
41+
42+
public SystemTablesProvider(TransactionManager transactionManager, Metadata metadata, String catalogName, Set<SystemTable> systemTables)
43+
{
44+
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
45+
this.metadata = requireNonNull(metadata, "metadata is null");
46+
this.catalogName = requireNonNull(catalogName, "catalogName is null");
47+
this.systemTables = requireNonNull(systemTables, "systemTables is null");
48+
this.systemTablesMap = systemTables.stream()
49+
.collect(toImmutableMap(
50+
table -> table.getTableMetadata().getTable(),
51+
identity()));
52+
}
53+
54+
public Set<SystemTable> listSystemTables(ConnectorSession session)
55+
{
56+
// dynamic are not listed, so ony list static tables
57+
return systemTables;
58+
}
59+
60+
public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTableName tableName)
61+
{
62+
Optional<SystemTable> staticSystemTable = Optional.ofNullable(systemTablesMap.get(tableName));
63+
if (staticSystemTable.isPresent()) {
64+
return staticSystemTable;
65+
}
66+
67+
// This means there is no known static table, but that doesn't mean a dynamic table must exist.
68+
// This node could have a different config that causes that table to not exist.
69+
if (!isCoordinatorTransaction(session)) {
70+
// this is a session from another coordinator, so there are no dynamic tables here for that session
71+
return Optional.empty();
72+
}
73+
74+
// dynamic tables that are not SINGLE_COORDINATOR mode need to implement the
75+
// PageSourceProvider interface for SystemSplit through the Connector interface
76+
return metadata.getSystemTable(
77+
((FullConnectorSession) session).getSession(),
78+
new QualifiedObjectName(catalogName, tableName.getSchemaName(), tableName.getTableName()));
79+
}
80+
81+
private boolean isCoordinatorTransaction(ConnectorSession connectorSession)
82+
{
83+
return Optional.of(connectorSession)
84+
.filter(FullConnectorSession.class::isInstance)
85+
.map(FullConnectorSession.class::cast)
86+
.map(FullConnectorSession::getSession)
87+
.flatMap(Session::getTransactionId)
88+
.map(transactionManager::transactionExists)
89+
.orElse(false);
90+
}
3391
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -779,7 +779,7 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
779779
case PARTITIONS -> Optional.of(new PartitionsTable(tableName, typeManager, table, getCurrentSnapshotId(table), icebergScanExecutor));
780780
case ALL_MANIFESTS -> Optional.of(new AllManifestsTable(tableName, table, icebergScanExecutor));
781781
case MANIFESTS -> Optional.of(new ManifestsTable(tableName, table, getCurrentSnapshotId(table)));
782-
case FILES -> Optional.of(new FilesTable(tableName, typeManager, table, getCurrentSnapshotId(table), icebergScanExecutor));
782+
case FILES -> Optional.of(new FilesTable(tableName, typeManager, table, getCurrentSnapshotId(table)));
783783
case ALL_ENTRIES -> Optional.of(new EntriesTable(typeManager, tableName, table, ALL_ENTRIES, icebergScanExecutor));
784784
case ENTRIES -> Optional.of(new EntriesTable(typeManager, tableName, table, ENTRIES, icebergScanExecutor));
785785
case PROPERTIES -> Optional.of(new PropertiesTable(tableName, table));

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.trino.plugin.hive.orc.OrcWriterConfig;
3434
import io.trino.plugin.hive.parquet.ParquetReaderConfig;
3535
import io.trino.plugin.hive.parquet.ParquetWriterConfig;
36+
import io.trino.plugin.iceberg.bean.PartitionFieldSummaryBean;
3637
import io.trino.plugin.iceberg.cache.IcebergCacheKeyProvider;
3738
import io.trino.plugin.iceberg.catalog.rest.DefaultIcebergFileSystemFactory;
3839
import io.trino.plugin.iceberg.fileio.ForwardingFileIoFactory;
@@ -60,10 +61,12 @@
6061
import io.trino.spi.function.FunctionProvider;
6162
import io.trino.spi.function.table.ConnectorTableFunction;
6263
import io.trino.spi.procedure.Procedure;
64+
import org.apache.iceberg.ManifestFile;
6365

6466
import static com.google.inject.multibindings.Multibinder.newSetBinder;
6567
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
6668
import static io.airlift.configuration.ConfigBinder.configBinder;
69+
import static io.airlift.json.JsonBinder.jsonBinder;
6770
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
6871
import static org.weakref.jmx.guice.ExportBinder.newExporter;
6972

@@ -105,6 +108,7 @@ public void configure(Binder binder)
105108
newOptionalBinder(binder, Key.get(HiveMetastoreFactory.class, RawHiveMetastoreFactory.class));
106109

107110
jsonCodecBinder(binder).bindJsonCodec(CommitTaskData.class);
111+
jsonBinder(binder).addDeserializerBinding(ManifestFile.PartitionFieldSummary.class).to(PartitionFieldSummaryBean.Deserializer.class);
108112

109113
binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
110114
newExporter(binder).export(FileFormatDataSourceStats.class).withGeneratedName();

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,9 @@ public ConnectorPageSource createPageSource(
246246
if (connectorSplit instanceof FilesTableSplit filesTableSplit) {
247247
return new FilesTablePageSource(
248248
typeManager,
249-
fileSystemFactory.create(session.getIdentity(), filesTableSplit.fileSystemProperties()),
249+
fileSystemFactory.create(session.getIdentity(), filesTableSplit.fileIoProperties()),
250250
fileIoFactory,
251-
columns.stream().map(SystemColumnHandle.class::cast).map(SystemColumnHandle::columnName).toList(),
251+
columns.stream().map(SystemColumnHandle.class::cast).map(SystemColumnHandle::columnName).collect(toImmutableList()),
252252
filesTableSplit);
253253
}
254254

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionsTable.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -154,16 +154,7 @@ public static List<PartitionField> getAllPartitionFields(Schema schema, Map<Inte
154154

155155
public static List<PartitionField> getAllPartitionFields(Table icebergTable)
156156
{
157-
Set<Integer> existingColumnsIds = TypeUtil.indexById(icebergTable.schema().asStruct()).keySet();
158-
159-
List<PartitionField> visiblePartitionFields = icebergTable.specs()
160-
.values().stream()
161-
.flatMap(partitionSpec -> partitionSpec.fields().stream())
162-
// skip columns that were dropped
163-
.filter(partitionField -> existingColumnsIds.contains(partitionField.sourceId()))
164-
.collect(toImmutableList());
165-
166-
return filterOutDuplicates(visiblePartitionFields);
157+
return getAllPartitionFields(icebergTable.schema(), icebergTable.specs());
167158
}
168159

169160
private static List<PartitionField> filterOutDuplicates(List<PartitionField> visiblePartitionFields)

0 commit comments

Comments
 (0)