diff --git a/core/trino-main/src/main/java/io/trino/connector/CatalogServiceProviderModule.java b/core/trino-main/src/main/java/io/trino/connector/CatalogServiceProviderModule.java index 7598411d2dd0..0fe2603af7f1 100644 --- a/core/trino-main/src/main/java/io/trino/connector/CatalogServiceProviderModule.java +++ b/core/trino-main/src/main/java/io/trino/connector/CatalogServiceProviderModule.java @@ -34,7 +34,7 @@ import io.trino.spi.connector.ConnectorIndexProvider; import io.trino.spi.connector.ConnectorNodePartitioningProvider; import io.trino.spi.connector.ConnectorPageSinkProvider; -import io.trino.spi.connector.ConnectorPageSourceProvider; +import io.trino.spi.connector.ConnectorPageSourceProviderFactory; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.function.FunctionProvider; @@ -59,9 +59,9 @@ public static CatalogServiceProvider createSplitManagerPr @Provides @Singleton - public static CatalogServiceProvider createPageSourceProvider(ConnectorServicesProvider connectorServicesProvider) + public static CatalogServiceProvider createPageSourceProviderFactory(ConnectorServicesProvider connectorServicesProvider) { - return new ConnectorCatalogServiceProvider<>("page source provider", connectorServicesProvider, connector -> connector.getPageSourceProvider().orElse(null)); + return new ConnectorCatalogServiceProvider<>("page source provider factory", connectorServicesProvider, connector -> connector.getPageSourceProviderFactory().orElse(null)); } @Provides diff --git a/core/trino-main/src/main/java/io/trino/connector/ConnectorServices.java b/core/trino-main/src/main/java/io/trino/connector/ConnectorServices.java index 416e0bc0e049..f22795ef8b7f 100644 --- a/core/trino-main/src/main/java/io/trino/connector/ConnectorServices.java +++ b/core/trino-main/src/main/java/io/trino/connector/ConnectorServices.java @@ -30,7 +30,7 @@ import io.trino.spi.connector.ConnectorIndexProvider; import io.trino.spi.connector.ConnectorNodePartitioningProvider; import io.trino.spi.connector.ConnectorPageSinkProvider; -import io.trino.spi.connector.ConnectorPageSourceProvider; +import io.trino.spi.connector.ConnectorPageSourceProviderFactory; import io.trino.spi.connector.ConnectorRecordSetProvider; import io.trino.spi.connector.ConnectorSecurityContext; import io.trino.spi.connector.ConnectorSplitManager; @@ -78,7 +78,7 @@ public class ConnectorServices private final Optional functionProvider; private final CatalogTableFunctions tableFunctions; private final Optional splitManager; - private final Optional pageSourceProvider; + private final Optional pageSourceProviderFactory; private final Optional pageSinkProvider; private final Optional indexProvider; private final Optional partitioningProvider; @@ -128,10 +128,10 @@ public ConnectorServices(Tracer tracer, CatalogHandle catalogHandle, Connector c } this.splitManager = Optional.ofNullable(splitManager); - ConnectorPageSourceProvider connectorPageSourceProvider = null; + ConnectorPageSourceProviderFactory connectorPageSourceProviderFactory = null; try { - connectorPageSourceProvider = connector.getPageSourceProvider(); - requireNonNull(connectorPageSourceProvider, format("Connector '%s' returned a null page source provider", catalogHandle)); + connectorPageSourceProviderFactory = connector.getPageSourceProviderFactory(); + requireNonNull(connectorPageSourceProviderFactory, format("Connector '%s' returned a null page source provider factory", catalogHandle)); } catch (UnsupportedOperationException ignored) { } @@ -139,12 +139,13 @@ public ConnectorServices(Tracer tracer, CatalogHandle catalogHandle, Connector c try { ConnectorRecordSetProvider connectorRecordSetProvider = connector.getRecordSetProvider(); requireNonNull(connectorRecordSetProvider, format("Connector '%s' returned a null record set provider", catalogHandle)); - verify(connectorPageSourceProvider == null, "Connector '%s' returned both page source and record set providers", catalogHandle); - connectorPageSourceProvider = new RecordPageSourceProvider(connectorRecordSetProvider); + verify(connectorPageSourceProviderFactory == null, "Connector '%s' returned both page source and record set providers", catalogHandle); + var pageSourceProvider = new RecordPageSourceProvider(connectorRecordSetProvider); + connectorPageSourceProviderFactory = () -> pageSourceProvider; } catch (UnsupportedOperationException ignored) { } - this.pageSourceProvider = Optional.ofNullable(connectorPageSourceProvider); + this.pageSourceProviderFactory = Optional.ofNullable(connectorPageSourceProviderFactory); ConnectorPageSinkProvider connectorPageSinkProvider = null; try { @@ -261,9 +262,9 @@ public Optional getSplitManager() return splitManager; } - public Optional getPageSourceProvider() + public Optional getPageSourceProviderFactory() { - return pageSourceProvider; + return pageSourceProviderFactory; } public Optional getPageSinkProvider() diff --git a/core/trino-main/src/main/java/io/trino/metadata/FunctionManager.java b/core/trino-main/src/main/java/io/trino/metadata/FunctionManager.java index 9a7c208380ea..6b28967cea00 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/FunctionManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/FunctionManager.java @@ -172,7 +172,7 @@ public TableFunctionProcessorProvider getTableFunctionProcessorProvider(TableFun checkArgument(provider != null, "No function provider for catalog: '%s'", catalogHandle); } - return provider.getTableFunctionProcessorProvider(tableFunctionHandle.getFunctionHandle()); + return provider.getTableFunctionProcessorProviderFactory(tableFunctionHandle.getFunctionHandle()).createTableFunctionProcessorProvider(); } private FunctionDependencies getFunctionDependencies(ResolvedFunction resolvedFunction) diff --git a/core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java b/core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java index 56163b947b1d..1ae76db4c976 100644 --- a/core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java @@ -43,6 +43,7 @@ import io.trino.spi.type.Type; import io.trino.split.EmptySplit; import io.trino.split.PageSourceProvider; +import io.trino.split.PageSourceProviderFactory; import io.trino.sql.planner.plan.PlanNodeId; import jakarta.annotation.Nullable; @@ -425,7 +426,7 @@ public ScanFilterAndProjectOperatorFactory( int operatorId, PlanNodeId planNodeId, PlanNodeId sourceId, - PageSourceProvider pageSourceProvider, + PageSourceProviderFactory pageSourceProvider, Supplier cursorProcessor, Supplier pageProcessor, TableHandle table, @@ -440,13 +441,13 @@ public ScanFilterAndProjectOperatorFactory( this.cursorProcessor = requireNonNull(cursorProcessor, "cursorProcessor is null"); this.pageProcessor = requireNonNull(pageProcessor, "pageProcessor is null"); this.sourceId = requireNonNull(sourceId, "sourceId is null"); - this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null"); this.table = requireNonNull(table, "table is null"); this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null")); this.dynamicFilter = dynamicFilter; this.types = requireNonNull(types, "types is null"); this.minOutputPageSize = requireNonNull(minOutputPageSize, "minOutputPageSize is null"); this.minOutputPageRowCount = minOutputPageRowCount; + this.pageSourceProvider = pageSourceProvider.createPageSourceProvider(table.getCatalogHandle()); } @Override diff --git a/core/trino-main/src/main/java/io/trino/operator/TableScanOperator.java b/core/trino-main/src/main/java/io/trino/operator/TableScanOperator.java index c1fe8d6b4ce7..0a8222da3f29 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TableScanOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/TableScanOperator.java @@ -28,6 +28,7 @@ import io.trino.spi.connector.EmptyPageSource; import io.trino.split.EmptySplit; import io.trino.split.PageSourceProvider; +import io.trino.split.PageSourceProviderFactory; import io.trino.sql.planner.plan.PlanNodeId; import jakarta.annotation.Nullable; @@ -60,7 +61,7 @@ public TableScanOperatorFactory( int operatorId, PlanNodeId planNodeId, PlanNodeId sourceId, - PageSourceProvider pageSourceProvider, + PageSourceProviderFactory pageSourceProvider, TableHandle table, Iterable columns, DynamicFilter dynamicFilter) @@ -68,10 +69,10 @@ public TableScanOperatorFactory( this.operatorId = operatorId; this.planNodeId = requireNonNull(planNodeId, "planNodeId is null"); this.sourceId = requireNonNull(sourceId, "sourceId is null"); - this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null"); this.table = requireNonNull(table, "table is null"); this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null")); this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null"); + this.pageSourceProvider = pageSourceProvider.createPageSourceProvider(table.getCatalogHandle()); } @Override diff --git a/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java b/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java index 831fd2432b9e..d4b712f017a1 100644 --- a/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java +++ b/core/trino-main/src/main/java/io/trino/server/ServerMainModule.java @@ -125,7 +125,7 @@ import io.trino.split.PageSinkManager; import io.trino.split.PageSinkProvider; import io.trino.split.PageSourceManager; -import io.trino.split.PageSourceProvider; +import io.trino.split.PageSourceProviderFactory; import io.trino.split.SplitManager; import io.trino.sql.PlannerContext; import io.trino.sql.SqlEnvironmentConfig; @@ -377,7 +377,7 @@ protected void setup(Binder binder) // data stream provider binder.bind(PageSourceManager.class).in(Scopes.SINGLETON); - binder.bind(PageSourceProvider.class).to(PageSourceManager.class).in(Scopes.SINGLETON); + binder.bind(PageSourceProviderFactory.class).to(PageSourceManager.class).in(Scopes.SINGLETON); // page sink provider binder.bind(PageSinkManager.class).in(Scopes.SINGLETON); diff --git a/core/trino-main/src/main/java/io/trino/split/PageSourceManager.java b/core/trino-main/src/main/java/io/trino/split/PageSourceManager.java index b985551cfee5..74ee2189c5f7 100644 --- a/core/trino-main/src/main/java/io/trino/split/PageSourceManager.java +++ b/core/trino-main/src/main/java/io/trino/split/PageSourceManager.java @@ -22,6 +22,7 @@ import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.connector.ConnectorPageSourceProvider; +import io.trino.spi.connector.ConnectorPageSourceProviderFactory; import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.EmptyPageSource; import io.trino.spi.predicate.TupleDomain; @@ -33,37 +34,56 @@ import static java.util.Objects.requireNonNull; public class PageSourceManager - implements PageSourceProvider + implements PageSourceProviderFactory { - private final CatalogServiceProvider pageSourceProvider; + private final CatalogServiceProvider pageSourceProviderFactory; @Inject - public PageSourceManager(CatalogServiceProvider pageSourceProvider) + public PageSourceManager(CatalogServiceProvider pageSourceProviderFactory) { - this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null"); + this.pageSourceProviderFactory = requireNonNull(pageSourceProviderFactory, "pageSourceProviderFactory is null"); } @Override - public ConnectorPageSource createPageSource(Session session, Split split, TableHandle table, List columns, DynamicFilter dynamicFilter) + public PageSourceProvider createPageSourceProvider(CatalogHandle catalogHandle) { - requireNonNull(columns, "columns is null"); - checkArgument(split.getCatalogHandle().equals(table.getCatalogHandle()), "mismatched split and table"); - CatalogHandle catalogHandle = split.getCatalogHandle(); + ConnectorPageSourceProviderFactory provider = pageSourceProviderFactory.getService(catalogHandle); + return new PageSourceProviderInstance(provider.createPageSourceProvider()); + } + + private static class PageSourceProviderInstance + implements PageSourceProvider + { + private final ConnectorPageSourceProvider pageSourceProvider; - ConnectorPageSourceProvider provider = pageSourceProvider.getService(catalogHandle); - TupleDomain constraint = dynamicFilter.getCurrentPredicate(); - if (constraint.isNone()) { - return new EmptyPageSource(); + private PageSourceProviderInstance(ConnectorPageSourceProvider pageSourceProvider) + { + this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null"); } - if (!isAllowPushdownIntoConnectors(session)) { - dynamicFilter = DynamicFilter.EMPTY; + + @Override + public ConnectorPageSource createPageSource(Session session, + Split split, + TableHandle table, + List columns, + DynamicFilter dynamicFilter) + { + requireNonNull(columns, "columns is null"); + checkArgument(split.getCatalogHandle().equals(table.getCatalogHandle()), "mismatched split and table"); + + TupleDomain constraint = dynamicFilter.getCurrentPredicate(); + if (constraint.isNone()) { + return new EmptyPageSource(); + } + if (!isAllowPushdownIntoConnectors(session)) { + dynamicFilter = DynamicFilter.EMPTY; + } + return pageSourceProvider.createPageSource(table.getTransaction(), + session.toConnectorSession(table.getCatalogHandle()), + split.getConnectorSplit(), + table.getConnectorHandle(), + columns, + dynamicFilter); } - return provider.createPageSource( - table.getTransaction(), - session.toConnectorSession(catalogHandle), - split.getConnectorSplit(), - table.getConnectorHandle(), - columns, - dynamicFilter); } } diff --git a/core/trino-main/src/main/java/io/trino/split/PageSourceProviderFactory.java b/core/trino-main/src/main/java/io/trino/split/PageSourceProviderFactory.java new file mode 100644 index 000000000000..858b3668036f --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/split/PageSourceProviderFactory.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.split; + +import io.trino.spi.connector.CatalogHandle; + +public interface PageSourceProviderFactory +{ + PageSourceProvider createPageSourceProvider(CatalogHandle catalogHandle); +} diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index f64106db4888..b6b0cfb61705 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -178,7 +178,7 @@ import io.trino.spiller.SingleStreamSpillerFactory; import io.trino.spiller.SpillerFactory; import io.trino.split.PageSinkManager; -import io.trino.split.PageSourceProvider; +import io.trino.split.PageSourceManager; import io.trino.sql.DynamicFilters; import io.trino.sql.PlannerContext; import io.trino.sql.gen.ExpressionCompiler; @@ -396,7 +396,7 @@ public class LocalExecutionPlanner private final Metadata metadata; private final IrTypeAnalyzer typeAnalyzer; private final Optional explainAnalyzeContext; - private final PageSourceProvider pageSourceProvider; + private final PageSourceManager pageSourceProvider; private final IndexManager indexManager; private final NodePartitioningManager nodePartitioningManager; private final PageSinkManager pageSinkManager; @@ -451,7 +451,7 @@ public LocalExecutionPlanner( PlannerContext plannerContext, IrTypeAnalyzer typeAnalyzer, Optional explainAnalyzeContext, - PageSourceProvider pageSourceProvider, + PageSourceManager pageSourceProvider, IndexManager indexManager, NodePartitioningManager nodePartitioningManager, PageSinkManager pageSinkManager, diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ExtractSpatialJoins.java b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ExtractSpatialJoins.java index 5f21d857fac4..44734e5df221 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ExtractSpatialJoins.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ExtractSpatialJoins.java @@ -41,6 +41,7 @@ import io.trino.spi.type.TypeManager; import io.trino.spi.type.TypeSignature; import io.trino.split.PageSourceManager; +import io.trino.split.PageSourceProvider; import io.trino.split.SplitManager; import io.trino.split.SplitSource; import io.trino.split.SplitSource.SplitBatch; @@ -471,12 +472,13 @@ private static KdbTree loadKdbTree(String tableName, Session session, Metadata m Optional kdbTree = Optional.empty(); try (SplitSource splitSource = splitManager.getSplits(session, session.getQuerySpan(), tableHandle, DynamicFilter.EMPTY, alwaysTrue())) { + PageSourceProvider statefulPageSourceProvider = pageSourceManager.createPageSourceProvider(tableHandle.getCatalogHandle()); while (!Thread.currentThread().isInterrupted()) { SplitBatch splitBatch = getFutureValue(splitSource.getNextBatch(1000)); List splits = splitBatch.getSplits(); for (Split split : splits) { - try (ConnectorPageSource pageSource = pageSourceManager.createPageSource(session, split, tableHandle, ImmutableList.of(kdbTreeColumn), DynamicFilter.EMPTY)) { + try (ConnectorPageSource pageSource = statefulPageSourceProvider.createPageSource(session, split, tableHandle, ImmutableList.of(kdbTreeColumn), DynamicFilter.EMPTY)) { do { getFutureValue(pageSource.isBlocked()); Page page = pageSource.getNextPage(); diff --git a/core/trino-main/src/main/java/io/trino/testing/PlanTester.java b/core/trino-main/src/main/java/io/trino/testing/PlanTester.java index fbcb6b52e380..c97ce540dcba 100644 --- a/core/trino-main/src/main/java/io/trino/testing/PlanTester.java +++ b/core/trino-main/src/main/java/io/trino/testing/PlanTester.java @@ -215,7 +215,7 @@ import static io.trino.connector.CatalogServiceProviderModule.createMaterializedViewPropertyManager; import static io.trino.connector.CatalogServiceProviderModule.createNodePartitioningProvider; import static io.trino.connector.CatalogServiceProviderModule.createPageSinkProvider; -import static io.trino.connector.CatalogServiceProviderModule.createPageSourceProvider; +import static io.trino.connector.CatalogServiceProviderModule.createPageSourceProviderFactory; import static io.trino.connector.CatalogServiceProviderModule.createSchemaPropertyManager; import static io.trino.connector.CatalogServiceProviderModule.createSplitManagerProvider; import static io.trino.connector.CatalogServiceProviderModule.createTableFunctionProvider; @@ -365,7 +365,7 @@ private PlanTester(Session defaultSession, int nodeCountForStats) nodeSchedulerConfig, optimizerConfig)); this.splitManager = new SplitManager(createSplitManagerProvider(catalogManager), tracer, new QueryManagerConfig()); - this.pageSourceManager = new PageSourceManager(createPageSourceProvider(catalogManager)); + this.pageSourceManager = new PageSourceManager(createPageSourceProviderFactory(catalogManager)); this.pageSinkManager = new PageSinkManager(createPageSinkProvider(catalogManager)); this.indexManager = new IndexManager(createIndexProvider(catalogManager)); NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(nodeManager, nodeSchedulerConfig, new NodeTaskMap(finalizerService))); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestingPageSourceProvider.java b/core/trino-main/src/test/java/io/trino/execution/TestingPageSourceProvider.java index fbbd6cde95a8..cbaf188ba0c0 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestingPageSourceProvider.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestingPageSourceProvider.java @@ -20,6 +20,7 @@ import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.connector.ConnectorPageSourceProvider; +import io.trino.spi.connector.ConnectorPageSourceProviderFactory; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.connector.ConnectorTableHandle; @@ -34,13 +35,19 @@ import static java.util.Objects.requireNonNull; public class TestingPageSourceProvider - implements ConnectorPageSourceProvider + implements ConnectorPageSourceProviderFactory, ConnectorPageSourceProvider { public TestingPageSourceProvider() { System.out.println(); } + @Override + public ConnectorPageSourceProvider createPageSourceProvider() + { + return this; + } + @Override public ConnectorPageSource createPageSource( ConnectorTransactionHandle transaction, diff --git a/core/trino-main/src/test/java/io/trino/operator/BenchmarkScanFilterAndProjectOperator.java b/core/trino-main/src/test/java/io/trino/operator/BenchmarkScanFilterAndProjectOperator.java index b17409fbb0e4..12b5ad3bfa3c 100644 --- a/core/trino-main/src/test/java/io/trino/operator/BenchmarkScanFilterAndProjectOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/BenchmarkScanFilterAndProjectOperator.java @@ -168,7 +168,7 @@ private void createScanFilterAndProjectOperatorFactories(List inputPages, 0, new PlanNodeId("test"), new PlanNodeId("test_source"), - (session, split, table, columns, dynamicFilter) -> new FixedPageSource(inputPages), + (catalog) -> (session, split, table, columns, dynamicFilter) -> new FixedPageSource(inputPages), () -> cursorProcessor, () -> pageProcessor, TEST_TABLE_HANDLE, diff --git a/core/trino-main/src/test/java/io/trino/operator/TestScanFilterAndProjectOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestScanFilterAndProjectOperator.java index 20814258b4bf..0db19374425b 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestScanFilterAndProjectOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestScanFilterAndProjectOperator.java @@ -125,7 +125,7 @@ public void testPageSource() 0, new PlanNodeId("test"), new PlanNodeId("0"), - (session, split, table, columns, dynamicFilter) -> new FixedPageSource(ImmutableList.of(input)), + (catalog) -> (session, split, table, columns, dynamicFilter) -> new FixedPageSource(ImmutableList.of(input)), cursorProcessor, pageProcessor, TEST_TABLE_HANDLE, @@ -167,7 +167,7 @@ public void testPageSourceMergeOutput() 0, new PlanNodeId("test"), new PlanNodeId("0"), - (session, split, table, columns, dynamicFilter) -> new FixedPageSource(input), + (catalog) -> (session, split, table, columns, dynamicFilter) -> new FixedPageSource(input), cursorProcessor, pageProcessor, TEST_TABLE_HANDLE, @@ -212,7 +212,7 @@ public void testPageSourceLazyLoad() 0, new PlanNodeId("test"), new PlanNodeId("0"), - (session, split, table, columns, dynamicFilter) -> new SinglePagePageSource(input), + (catalog) -> (session, split, table, columns, dynamicFilter) -> new SinglePagePageSource(input), cursorProcessor, () -> pageProcessor, TEST_TABLE_HANDLE, @@ -246,7 +246,7 @@ public void testRecordCursorSource() 0, new PlanNodeId("test"), new PlanNodeId("0"), - (session, split, table, columns, dynamicFilter) -> new RecordPageSource(new PageRecordSet(ImmutableList.of(VARCHAR), input)), + (catalog) -> (session, split, table, columns, dynamicFilter) -> new RecordPageSource(new PageRecordSet(ImmutableList.of(VARCHAR), input)), cursorProcessor, pageProcessor, TEST_TABLE_HANDLE, @@ -297,7 +297,7 @@ public void testPageYield() 0, new PlanNodeId("test"), new PlanNodeId("0"), - (session, split, table, columns, dynamicFilter) -> new FixedPageSource(ImmutableList.of(input)), + (catalog) -> (session, split, table, columns, dynamicFilter) -> new FixedPageSource(ImmutableList.of(input)), cursorProcessor, pageProcessor, TEST_TABLE_HANDLE, @@ -362,7 +362,7 @@ public void testRecordCursorYield() 0, new PlanNodeId("test"), new PlanNodeId("0"), - (session, split, table, columns, dynamicFilter) -> new RecordPageSource(new PageRecordSet(ImmutableList.of(BIGINT), input)), + (catalog) -> (session, split, table, columns, dynamicFilter) -> new RecordPageSource(new PageRecordSet(ImmutableList.of(BIGINT), input)), cursorProcessor, pageProcessor, TEST_TABLE_HANDLE, diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/Connector.java b/core/trino-spi/src/main/java/io/trino/spi/connector/Connector.java index 30efe527db58..9a9b6a89b3e2 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/Connector.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/Connector.java @@ -75,6 +75,16 @@ default ConnectorPageSourceProvider getPageSourceProvider() throw new UnsupportedOperationException(); } + /** + * Provide a pageSourceProviderFactory to create stateful instances of PageSourceProvider per query. + * If not implemented a singleton instance returned by getPageSourceProvider will be used for all queries. + */ + default ConnectorPageSourceProviderFactory getPageSourceProviderFactory() + { + ConnectorPageSourceProvider pageSourceProvider = getPageSourceProvider(); + return () -> pageSourceProvider; + } + /** * @throws UnsupportedOperationException if this connector does not support reading tables record at a time */ diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPageSourceProviderFactory.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPageSourceProviderFactory.java new file mode 100644 index 000000000000..ff3246cfda48 --- /dev/null +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorPageSourceProviderFactory.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spi.connector; + +import java.io.Closeable; + +public interface ConnectorPageSourceProviderFactory + extends Closeable +{ + ConnectorPageSourceProvider createPageSourceProvider(); + + @Override + default void close() {} +} diff --git a/core/trino-spi/src/main/java/io/trino/spi/function/FunctionProvider.java b/core/trino-spi/src/main/java/io/trino/spi/function/FunctionProvider.java index 7bfbdc3ef7fb..72af7e1860c6 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/function/FunctionProvider.java +++ b/core/trino-spi/src/main/java/io/trino/spi/function/FunctionProvider.java @@ -16,6 +16,7 @@ import io.trino.spi.Experimental; import io.trino.spi.function.table.ConnectorTableFunctionHandle; import io.trino.spi.function.table.TableFunctionProcessorProvider; +import io.trino.spi.function.table.TableFunctionProcessorProviderFactory; @Experimental(eta = "2023-03-31") public interface FunctionProvider @@ -43,4 +44,10 @@ default TableFunctionProcessorProvider getTableFunctionProcessorProvider(Connect { throw new UnsupportedOperationException("%s does not provide table functions".formatted(getClass().getName())); } + + default TableFunctionProcessorProviderFactory getTableFunctionProcessorProviderFactory(ConnectorTableFunctionHandle functionHandle) + { + var tableFunctionProvider = getTableFunctionProcessorProvider(functionHandle); + return () -> tableFunctionProvider; + } } diff --git a/core/trino-spi/src/main/java/io/trino/spi/function/table/TableFunctionProcessorProviderFactory.java b/core/trino-spi/src/main/java/io/trino/spi/function/table/TableFunctionProcessorProviderFactory.java new file mode 100644 index 000000000000..34413b44596c --- /dev/null +++ b/core/trino-spi/src/main/java/io/trino/spi/function/table/TableFunctionProcessorProviderFactory.java @@ -0,0 +1,22 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spi.function.table; + +import io.trino.spi.Experimental; + +@Experimental(eta = "2023-07-31") +public interface TableFunctionProcessorProviderFactory +{ + TableFunctionProcessorProvider createTableFunctionProcessorProvider(); +} diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorPageSourceProviderFactory.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorPageSourceProviderFactory.java new file mode 100644 index 000000000000..fab67c27e884 --- /dev/null +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorPageSourceProviderFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.base.classloader; + +import com.google.inject.Inject; +import io.trino.spi.classloader.ThreadContextClassLoader; +import io.trino.spi.connector.ConnectorPageSourceProvider; +import io.trino.spi.connector.ConnectorPageSourceProviderFactory; + +import static java.util.Objects.requireNonNull; + +public class ClassLoaderSafeConnectorPageSourceProviderFactory + implements ConnectorPageSourceProviderFactory +{ + private final ConnectorPageSourceProviderFactory delegate; + private final ClassLoader classLoader; + + @Inject + public ClassLoaderSafeConnectorPageSourceProviderFactory(@ForClassLoaderSafe ConnectorPageSourceProviderFactory delegate, ClassLoader classLoader) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.classLoader = requireNonNull(classLoader, "classLoader is null"); + } + + @Override + public ConnectorPageSourceProvider createPageSourceProvider() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return new ClassLoaderSafeConnectorPageSourceProvider(delegate.createPageSourceProvider(), classLoader); + } + } +} diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeTableFunctionProcessorProviderFactory.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeTableFunctionProcessorProviderFactory.java new file mode 100644 index 000000000000..bfbc41824933 --- /dev/null +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeTableFunctionProcessorProviderFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.base.classloader; + +import io.trino.spi.classloader.ThreadContextClassLoader; +import io.trino.spi.function.table.TableFunctionProcessorProvider; +import io.trino.spi.function.table.TableFunctionProcessorProviderFactory; + +import static java.util.Objects.requireNonNull; + +public final class ClassLoaderSafeTableFunctionProcessorProviderFactory + implements TableFunctionProcessorProviderFactory +{ + private final TableFunctionProcessorProviderFactory delegate; + private final ClassLoader classLoader; + + public ClassLoaderSafeTableFunctionProcessorProviderFactory(TableFunctionProcessorProviderFactory delegate, ClassLoader classLoader) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.classLoader = requireNonNull(classLoader, "classLoader is null"); + } + + @Override + public TableFunctionProcessorProvider createTableFunctionProcessorProvider() + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + return new ClassLoaderSafeTableFunctionProcessorProvider(delegate.createTableFunctionProcessorProvider(), classLoader); + } + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java index 5d386113f92d..ba1acff6c6f0 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java @@ -596,7 +596,7 @@ public SourceOperator newTableScanOperator(DriverContext driverContext) 0, new PlanNodeId("0"), new PlanNodeId("0"), - (session, split, table, columnHandles, dynamicFilter) -> pageSource, + (catalog) -> (session, split, table, columnHandles, dynamicFilter) -> pageSource, TEST_TABLE_HANDLE, columns.stream().map(ColumnHandle.class::cast).collect(toImmutableList()), DynamicFilter.EMPTY); @@ -618,7 +618,7 @@ public SourceOperator newScanFilterAndProjectOperator(DriverContext driverContex 0, new PlanNodeId("test"), new PlanNodeId("0"), - (session, split, table, columnHandles, dynamicFilter) -> pageSource, + (catalog) -> (session, split, table, columnHandles, dynamicFilter) -> pageSource, cursorProcessor, pageProcessor, TEST_TABLE_HANDLE, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnector.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnector.java index 5e9e02d45a81..012b43ce7230 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnector.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnector.java @@ -27,7 +27,7 @@ import io.trino.spi.connector.ConnectorMetadata; import io.trino.spi.connector.ConnectorNodePartitioningProvider; import io.trino.spi.connector.ConnectorPageSinkProvider; -import io.trino.spi.connector.ConnectorPageSourceProvider; +import io.trino.spi.connector.ConnectorPageSourceProviderFactory; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorTransactionHandle; @@ -57,7 +57,7 @@ public class IcebergConnector private final LifeCycleManager lifeCycleManager; private final IcebergTransactionManager transactionManager; private final ConnectorSplitManager splitManager; - private final ConnectorPageSourceProvider pageSourceProvider; + private final ConnectorPageSourceProviderFactory pageSourceProviderFactory; private final ConnectorPageSinkProvider pageSinkProvider; private final ConnectorNodePartitioningProvider nodePartitioningProvider; private final List> sessionProperties; @@ -76,7 +76,7 @@ public IcebergConnector( LifeCycleManager lifeCycleManager, IcebergTransactionManager transactionManager, ConnectorSplitManager splitManager, - ConnectorPageSourceProvider pageSourceProvider, + ConnectorPageSourceProviderFactory pageSourceProviderFactory, ConnectorPageSinkProvider pageSinkProvider, ConnectorNodePartitioningProvider nodePartitioningProvider, Set sessionPropertiesProviders, @@ -94,7 +94,7 @@ public IcebergConnector( this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); this.splitManager = requireNonNull(splitManager, "splitManager is null"); - this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null"); + this.pageSourceProviderFactory = requireNonNull(pageSourceProviderFactory, "pageSourceProviderFactory is null"); this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null"); this.nodePartitioningProvider = requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null"); this.sessionProperties = sessionPropertiesProviders.stream() @@ -133,9 +133,9 @@ public ConnectorSplitManager getSplitManager() } @Override - public ConnectorPageSourceProvider getPageSourceProvider() + public ConnectorPageSourceProviderFactory getPageSourceProviderFactory() { - return pageSourceProvider; + return pageSourceProviderFactory; } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnectorFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnectorFactory.java index 60e694487f29..d11f763b8420 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnectorFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnectorFactory.java @@ -26,7 +26,7 @@ import io.trino.filesystem.manager.FileSystemModule; import io.trino.plugin.base.CatalogName; import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider; -import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider; +import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProviderFactory; import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitManager; import io.trino.plugin.base.classloader.ClassLoaderSafeNodePartitioningProvider; import io.trino.plugin.base.jmx.ConnectorObjectNameGeneratorModule; @@ -46,7 +46,7 @@ import io.trino.spi.connector.ConnectorFactory; import io.trino.spi.connector.ConnectorNodePartitioningProvider; import io.trino.spi.connector.ConnectorPageSinkProvider; -import io.trino.spi.connector.ConnectorPageSourceProvider; +import io.trino.spi.connector.ConnectorPageSourceProviderFactory; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.TableProcedureMetadata; import io.trino.spi.function.FunctionProvider; @@ -119,7 +119,7 @@ public static Connector createConnector( LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class); IcebergTransactionManager transactionManager = injector.getInstance(IcebergTransactionManager.class); ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class); - ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class); + ConnectorPageSourceProviderFactory connectorPageSource = injector.getInstance(ConnectorPageSourceProviderFactory.class); ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class); ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class); Set sessionPropertiesProviders = injector.getInstance(Key.get(new TypeLiteral<>() {})); @@ -139,7 +139,7 @@ public static Connector createConnector( lifeCycleManager, transactionManager, new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), - new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader), + new ClassLoaderSafeConnectorPageSourceProviderFactory(connectorPageSource, classLoader), new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader), new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader), sessionPropertiesProviders, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java index 3233df8da532..5917600b3a1c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java @@ -43,7 +43,7 @@ import io.trino.plugin.iceberg.procedure.UnregisterTableProcedure; import io.trino.spi.connector.ConnectorNodePartitioningProvider; import io.trino.spi.connector.ConnectorPageSinkProvider; -import io.trino.spi.connector.ConnectorPageSourceProvider; +import io.trino.spi.connector.ConnectorPageSourceProviderFactory; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.TableProcedureMetadata; import io.trino.spi.function.FunctionProvider; @@ -78,8 +78,8 @@ public void configure(Binder binder) binder.bind(IcebergAnalyzeProperties.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).to(IcebergSplitManager.class).in(Scopes.SINGLETON); - newOptionalBinder(binder, ConnectorPageSourceProvider.class).setDefault().to(IcebergPageSourceProvider.class).in(Scopes.SINGLETON); - binder.bind(IcebergPageSourceProvider.class).in(Scopes.SINGLETON); + newOptionalBinder(binder, ConnectorPageSourceProviderFactory.class).setDefault().to(IcebergPageSourceProviderFactory.class).in(Scopes.SINGLETON); + binder.bind(IcebergPageSourceProviderFactory.class).in(Scopes.SINGLETON); binder.bind(ConnectorPageSinkProvider.class).to(IcebergPageSinkProvider.class).in(Scopes.SINGLETON); binder.bind(ConnectorNodePartitioningProvider.class).to(IcebergNodePartitioningProvider.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 52a8badfaaa0..b31c23b31f32 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -14,16 +14,13 @@ package io.trino.plugin.iceberg; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.VerifyException; +import com.google.common.base.Suppliers; import com.google.common.collect.AbstractIterator; import com.google.common.collect.BiMap; import com.google.common.collect.ImmutableBiMap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.inject.Inject; -import io.airlift.slice.Slice; -import io.trino.annotation.NotThreadSafe; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoInputFile; @@ -53,14 +50,10 @@ import io.trino.plugin.hive.ReaderProjectionsAdapter; import io.trino.plugin.hive.orc.OrcPageSource; import io.trino.plugin.hive.orc.OrcPageSource.ColumnAdaptation; -import io.trino.plugin.hive.orc.OrcReaderConfig; import io.trino.plugin.hive.parquet.ParquetPageSource; -import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.iceberg.IcebergParquetColumnIOConverter.FieldContext; import io.trino.plugin.iceberg.delete.DeleteFile; -import io.trino.plugin.iceberg.delete.DeleteFilter; -import io.trino.plugin.iceberg.delete.EqualityDeleteFilter; -import io.trino.plugin.iceberg.delete.PositionDeleteFilter; +import io.trino.plugin.iceberg.delete.DeleteManager; import io.trino.plugin.iceberg.delete.RowPredicate; import io.trino.plugin.iceberg.fileio.ForwardingInputFile; import io.trino.spi.Page; @@ -79,9 +72,7 @@ import io.trino.spi.connector.FixedPageSource; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.NullableValue; -import io.trino.spi.predicate.Range; import io.trino.spi.predicate.TupleDomain; -import io.trino.spi.predicate.ValueSet; import io.trino.spi.type.ArrayType; import io.trino.spi.type.MapType; import io.trino.spi.type.RowType; @@ -94,7 +85,6 @@ import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; -import org.apache.iceberg.StructLike; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.MappedField; @@ -102,10 +92,7 @@ import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.parquet.ParquetSchemaUtil; -import org.apache.iceberg.types.Conversions; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.util.StructLikeSet; -import org.apache.iceberg.util.StructProjection; +import org.apache.iceberg.util.StructLikeMap; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -114,12 +101,9 @@ import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; -import org.roaringbitmap.longlong.LongBitmapDataProvider; -import org.roaringbitmap.longlong.Roaring64Bitmap; import java.io.IOException; import java.io.UncheckedIOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -177,8 +161,6 @@ import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys; import static io.trino.plugin.iceberg.IcebergUtil.getPartitionValues; import static io.trino.plugin.iceberg.IcebergUtil.schemaFromHandles; -import static io.trino.plugin.iceberg.delete.EqualityDeleteFilter.readEqualityDeletes; -import static io.trino.plugin.iceberg.delete.PositionDeleteFilter.readPositionDeletes; import static io.trino.plugin.iceberg.util.OrcIcebergIds.fileColumnsByIcebergId; import static io.trino.plugin.iceberg.util.OrcTypeConverter.ICEBERG_BINARY_TYPE; import static io.trino.plugin.iceberg.util.OrcTypeConverter.ORC_ICEBERG_ID_KEY; @@ -205,8 +187,6 @@ import static java.util.stream.Collectors.toUnmodifiableList; import static org.apache.iceberg.FileContent.EQUALITY_DELETES; import static org.apache.iceberg.FileContent.POSITION_DELETES; -import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH; -import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS; import static org.apache.iceberg.MetadataColumns.ROW_POSITION; import static org.joda.time.DateTimeZone.UTC; @@ -226,20 +206,23 @@ public class IcebergPageSourceProvider private final OrcReaderOptions orcReaderOptions; private final ParquetReaderOptions parquetReaderOptions; private final TypeManager typeManager; + private final Object deleteManagersLock; + private HashMap> deleteManagers; + private DeleteManager unpartitionedTableDeleteManager; - @Inject public IcebergPageSourceProvider( IcebergFileSystemFactory fileSystemFactory, FileFormatDataSourceStats fileFormatDataSourceStats, - OrcReaderConfig orcReaderConfig, - ParquetReaderConfig parquetReaderConfig, + OrcReaderOptions orcReaderOptions, + ParquetReaderOptions parquetReaderOptions, TypeManager typeManager) { this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null"); - this.orcReaderOptions = orcReaderConfig.toOrcReaderOptions(); - this.parquetReaderOptions = parquetReaderConfig.toParquetReaderOptions(); + this.orcReaderOptions = requireNonNull(orcReaderOptions, "orcReaderOptions is null"); + this.parquetReaderOptions = requireNonNull(parquetReaderOptions, "parquetReaderOptions is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.deleteManagersLock = new Object(); } @Override @@ -279,6 +262,7 @@ public ConnectorPageSource createPageSource( split.getPartitionDataJson(), split.getFileFormat(), split.getFileIoProperties(), + split.getDataSequenceNumber(), tableHandle.getNameMappingJson().map(NameMappingParser::fromJson)); } @@ -299,6 +283,7 @@ public ConnectorPageSource createPageSource( String partitionDataJson, IcebergFileFormat fileFormat, Map fileIoProperties, + long dataSequenceNumber, Optional nameMapping) { Set deleteFilterRequiredColumns = requiredColumnsForDeletes(tableSchema, deletes); @@ -395,20 +380,16 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) { .map(readerColumns -> readerColumns.get().stream().map(IcebergColumnHandle.class::cast).collect(toList())) .orElse(requiredColumns); - Supplier> deletePredicate = memoize(() -> { - List deleteFilters = readDeletes( - session, - fileSystem, - tableSchema, - readColumns, - path, - deletes, - readerPageSourceWithRowPositions.getStartRowPosition(), - readerPageSourceWithRowPositions.getEndRowPosition()); - return deleteFilters.stream() - .map(filter -> filter.createPredicate(readColumns)) - .reduce(RowPredicate::and); - }); + Supplier> deletePredicate = + Suppliers.memoize(() -> getDeleteManager(partitionSpec, partitionData).getDeletePredicate(tableSchema, + path, + dataSequenceNumber, + deletes, + readerPageSourceWithRowPositions.getStartRowPosition(), + readerPageSourceWithRowPositions.getEndRowPosition(), + readColumns, + typeManager, + (deleteFile, deleteColumns, tupleDomain) -> openDeletes(session, fileSystem, deleteFile, deleteColumns, tupleDomain))); return new IcebergPageSource( icebergColumns, @@ -418,6 +399,24 @@ else if (identity.getId() == TRINO_MERGE_PARTITION_DATA) { deletePredicate); } + private DeleteManager getDeleteManager(PartitionSpec partitionSpec, PartitionData partitionData) + { + synchronized (deleteManagersLock) { + if (partitionSpec.isUnpartitioned()) { + if (unpartitionedTableDeleteManager == null) { + unpartitionedTableDeleteManager = new DeleteManager(); + } + return unpartitionedTableDeleteManager; + } + if (deleteManagers == null && partitionSpec.isPartitioned()) { + deleteManagers = new HashMap<>(); + } + StructLikeMap deleteManagersForPartition = + deleteManagers.computeIfAbsent(partitionSpec.specId(), (key) -> StructLikeMap.create(partitionSpec.partitionType())); + return deleteManagersForPartition.computeIfAbsent(partitionData, partition -> new DeleteManager()); + } + } + private TupleDomain getEffectivePredicate( Schema tableSchema, Map> partitionKeys, @@ -457,90 +456,6 @@ else if (deleteFile.content() == EQUALITY_DELETES) { return requiredColumns.build(); } - private List readDeletes( - ConnectorSession session, - TrinoFileSystem fileSystem, - Schema schema, - List readColumns, - String dataFilePath, - List deleteFiles, - Optional startRowPosition, - Optional endRowPosition) - { - verify(startRowPosition.isPresent() == endRowPosition.isPresent(), "startRowPosition and endRowPosition must be specified together"); - - Slice targetPath = utf8Slice(dataFilePath); - List filters = new ArrayList<>(); - LongBitmapDataProvider deletedRows = new Roaring64Bitmap(); - Map, EqualityDeleteSet> deletesSetByFieldIds = new HashMap<>(); - - IcebergColumnHandle deleteFilePath = getColumnHandle(DELETE_FILE_PATH, typeManager); - IcebergColumnHandle deleteFilePos = getColumnHandle(DELETE_FILE_POS, typeManager); - List deleteColumns = ImmutableList.of(deleteFilePath, deleteFilePos); - TupleDomain deleteDomain = TupleDomain.fromFixedValues(ImmutableMap.of(deleteFilePath, NullableValue.of(VARCHAR, targetPath))); - if (startRowPosition.isPresent()) { - Range positionRange = Range.range(deleteFilePos.getType(), startRowPosition.get(), true, endRowPosition.get(), true); - TupleDomain positionDomain = TupleDomain.withColumnDomains(ImmutableMap.of(deleteFilePos, Domain.create(ValueSet.ofRanges(positionRange), false))); - deleteDomain = deleteDomain.intersect(positionDomain); - } - - for (DeleteFile delete : deleteFiles) { - if (delete.content() == POSITION_DELETES) { - if (startRowPosition.isPresent()) { - byte[] lowerBoundBytes = delete.getLowerBounds().get(DELETE_FILE_POS.fieldId()); - Optional positionLowerBound = Optional.ofNullable(lowerBoundBytes) - .map(bytes -> Conversions.fromByteBuffer(DELETE_FILE_POS.type(), ByteBuffer.wrap(bytes))); - - byte[] upperBoundBytes = delete.getUpperBounds().get(DELETE_FILE_POS.fieldId()); - Optional positionUpperBound = Optional.ofNullable(upperBoundBytes) - .map(bytes -> Conversions.fromByteBuffer(DELETE_FILE_POS.type(), ByteBuffer.wrap(bytes))); - - if ((positionLowerBound.isPresent() && positionLowerBound.get() > endRowPosition.get()) || - (positionUpperBound.isPresent() && positionUpperBound.get() < startRowPosition.get())) { - continue; - } - } - - try (ConnectorPageSource pageSource = openDeletes(session, fileSystem, delete, deleteColumns, deleteDomain)) { - readPositionDeletes(pageSource, targetPath, deletedRows); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } - else if (delete.content() == EQUALITY_DELETES) { - Set fieldIds = ImmutableSet.copyOf(delete.equalityFieldIds()); - verify(!fieldIds.isEmpty(), "equality field IDs are missing"); - Schema deleteSchema = TypeUtil.select(schema, fieldIds); - List columns = deleteSchema.columns().stream() - .map(column -> getColumnHandle(column, typeManager)) - .collect(toImmutableList()); - - EqualityDeleteSet equalityDeleteSet = deletesSetByFieldIds.computeIfAbsent(fieldIds, key -> new EqualityDeleteSet(deleteSchema, schemaFromHandles(readColumns))); - - try (ConnectorPageSource pageSource = openDeletes(session, fileSystem, delete, columns, TupleDomain.all())) { - readEqualityDeletes(pageSource, columns, equalityDeleteSet::add); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } - else { - throw new VerifyException("Unknown delete content: " + delete.content()); - } - } - - if (!deletedRows.isEmpty()) { - filters.add(new PositionDeleteFilter(deletedRows)); - } - - for (EqualityDeleteSet equalityDeleteSet : deletesSetByFieldIds.values()) { - filters.add(new EqualityDeleteFilter(equalityDeleteSet::contains)); - } - - return filters; - } - private ConnectorPageSource openDeletes( ConnectorSession session, TrinoFileSystem fileSystem, @@ -1568,12 +1483,12 @@ public Iterable orderedPrefixes() private int prefixLength; @Override - public DereferenceChain computeNext() + public IcebergPageSourceProvider.DereferenceChain computeNext() { if (prefixLength > path.size()) { return endOfData(); } - return new DereferenceChain(baseColumnIdentity, path.subList(0, prefixLength++)); + return new IcebergPageSourceProvider.DereferenceChain(baseColumnIdentity, path.subList(0, prefixLength++)); } }; } @@ -1599,27 +1514,4 @@ public int hashCode() return Objects.hash(baseColumnIdentity, path); } } - - @NotThreadSafe - private static class EqualityDeleteSet - { - private final StructLikeSet deleteSet; - private final StructProjection projection; - - public EqualityDeleteSet(Schema deleteSchema, Schema dataSchema) - { - this.deleteSet = StructLikeSet.create(deleteSchema.asStruct()); - this.projection = StructProjection.create(dataSchema, deleteSchema); - } - - public void add(StructLike row) - { - deleteSet.add(row); - } - - public boolean contains(StructLike row) - { - return deleteSet.contains(projection.wrap(row)); - } - } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProviderFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProviderFactory.java new file mode 100644 index 000000000000..6803b8f82efe --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProviderFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.inject.Inject; +import io.trino.orc.OrcReaderOptions; +import io.trino.parquet.ParquetReaderOptions; +import io.trino.plugin.hive.FileFormatDataSourceStats; +import io.trino.plugin.hive.orc.OrcReaderConfig; +import io.trino.plugin.hive.parquet.ParquetReaderConfig; +import io.trino.spi.connector.ConnectorPageSourceProvider; +import io.trino.spi.connector.ConnectorPageSourceProviderFactory; +import io.trino.spi.type.TypeManager; + +import static java.util.Objects.requireNonNull; + +public class IcebergPageSourceProviderFactory + implements ConnectorPageSourceProviderFactory +{ + private final IcebergFileSystemFactory fileSystemFactory; + private final FileFormatDataSourceStats fileFormatDataSourceStats; + private final OrcReaderOptions orcReaderOptions; + private final ParquetReaderOptions parquetReaderOptions; + private final TypeManager typeManager; + + @Inject + public IcebergPageSourceProviderFactory( + IcebergFileSystemFactory fileSystemFactory, + FileFormatDataSourceStats fileFormatDataSourceStats, + OrcReaderConfig orcReaderConfig, + ParquetReaderConfig parquetReaderConfig, + TypeManager typeManager) + { + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); + this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null"); + this.orcReaderOptions = orcReaderConfig.toOrcReaderOptions(); + this.parquetReaderOptions = parquetReaderConfig.toParquetReaderOptions(); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + } + + @Override + public ConnectorPageSourceProvider createPageSourceProvider() + { + return new IcebergPageSourceProvider(fileSystemFactory, fileFormatDataSourceStats, orcReaderOptions, parquetReaderOptions, typeManager); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java index d77f0d77b8b6..3f4719937663 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplit.java @@ -48,6 +48,7 @@ public class IcebergSplit private final String partitionDataJson; private final List deletes; private final SplitWeight splitWeight; + private final long dataSequenceNumber; private final Map fileIoProperties; private final List addresses; @@ -63,7 +64,8 @@ public IcebergSplit( @JsonProperty("partitionDataJson") String partitionDataJson, @JsonProperty("deletes") List deletes, @JsonProperty("splitWeight") SplitWeight splitWeight, - @JsonProperty("fileIoProperties") Map fileIoProperties) + @JsonProperty("fileIoProperties") Map fileIoProperties, + @JsonProperty("dataSequenceNumber") long dataSequenceNumber) { this( path, @@ -77,7 +79,8 @@ public IcebergSplit( deletes, splitWeight, fileIoProperties, - ImmutableList.of()); + ImmutableList.of(), + dataSequenceNumber); } public IcebergSplit( @@ -92,7 +95,8 @@ public IcebergSplit( List deletes, SplitWeight splitWeight, Map fileIoProperties, - List addresses) + List addresses, + long dataSequenceNumber) { this.path = requireNonNull(path, "path is null"); this.start = start; @@ -106,6 +110,7 @@ public IcebergSplit( this.splitWeight = requireNonNull(splitWeight, "splitWeight is null"); this.fileIoProperties = ImmutableMap.copyOf(requireNonNull(fileIoProperties, "fileIoProperties is null")); this.addresses = requireNonNull(addresses, "addresses is null"); + this.dataSequenceNumber = dataSequenceNumber; } @Override @@ -182,6 +187,12 @@ public SplitWeight getSplitWeight() return splitWeight; } + @JsonProperty + public long getDataSequenceNumber() + { + return dataSequenceNumber; + } + @JsonProperty public Map getFileIoProperties() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index f0ada293c2b0..4cdd140fcf11 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -522,7 +522,8 @@ private IcebergSplit toIcebergSplit(FileScanTask task) .collect(toImmutableList()), SplitWeight.fromProportion(clamp((double) task.length() / tableScan.targetSplitSize(), minimumAssignedSplitWeight, 1.0)), fileIoProperties, - cachingHostAddressProvider.getHosts(task.file().path().toString(), ImmutableList.of())); + cachingHostAddressProvider.getHosts(task.file().path().toString(), ImmutableList.of()), + task.file().dataSequenceNumber()); } private static Domain getPathDomain(TupleDomain effectivePredicate) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java index 17fdcad61e10..f1789daac3ff 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java @@ -47,6 +47,7 @@ public final class DeleteFile private final List equalityFieldIds; private final Map lowerBounds; private final Map upperBounds; + private final long dataSequenceNumber; public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile) { @@ -63,7 +64,8 @@ public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile) deleteFile.fileSizeInBytes(), Optional.ofNullable(deleteFile.equalityFieldIds()).orElseGet(ImmutableList::of), lowerBounds, - upperBounds); + upperBounds, + deleteFile.dataSequenceNumber()); } @JsonCreator @@ -75,7 +77,8 @@ public DeleteFile( long fileSizeInBytes, List equalityFieldIds, Map lowerBounds, - Map upperBounds) + Map upperBounds, + long dataSequenceNumber) { this.content = requireNonNull(content, "content is null"); this.path = requireNonNull(path, "path is null"); @@ -85,6 +88,7 @@ public DeleteFile( this.equalityFieldIds = ImmutableList.copyOf(requireNonNull(equalityFieldIds, "equalityFieldIds is null")); this.lowerBounds = ImmutableMap.copyOf(requireNonNull(lowerBounds, "lowerBounds is null")); this.upperBounds = ImmutableMap.copyOf(requireNonNull(upperBounds, "upperBounds is null")); + this.dataSequenceNumber = dataSequenceNumber; } @JsonProperty @@ -135,6 +139,12 @@ public Map getUpperBounds() return upperBounds; } + @JsonProperty + public long getDataSequenceNumber() + { + return dataSequenceNumber; + } + public long getRetainedSizeInBytes() { return INSTANCE_SIZE diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFilter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFilter.java index b061ec276efc..68ba48f49848 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFilter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFilter.java @@ -19,5 +19,5 @@ public interface DeleteFilter { - RowPredicate createPredicate(List columns); + RowPredicate createPredicate(List columns, long dataSequenceNumber); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java new file mode 100644 index 000000000000..c7913e12a3ea --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java @@ -0,0 +1,248 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.delete; + +import com.google.common.base.VerifyException; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.airlift.slice.Slice; +import io.trino.plugin.iceberg.IcebergColumnHandle; +import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.predicate.Domain; +import io.trino.spi.predicate.NullableValue; +import io.trino.spi.predicate.Range; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.predicate.ValueSet; +import io.trino.spi.type.TypeManager; +import jakarta.validation.constraints.NotNull; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Conversions; +import org.roaringbitmap.longlong.LongBitmapDataProvider; +import org.roaringbitmap.longlong.Roaring64Bitmap; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle; +import static io.trino.plugin.iceberg.IcebergUtil.schemaFromHandles; +import static io.trino.plugin.iceberg.delete.PositionDeleteFilter.readPositionDeletes; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static org.apache.iceberg.FileContent.EQUALITY_DELETES; +import static org.apache.iceberg.FileContent.POSITION_DELETES; +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH; +import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS; + +public class DeleteManager +{ + private final ConcurrentHashMap, EqualityDeleteFilter> equalityDeleteFiltersBySchema = new ConcurrentHashMap<>(); + + /** + * @return an optional {@link RowPredicate} that indicates if a row is deleted. + * The resulting predicate is not thread-safe, but it is safe to create a predicate per thread by calling this method on each thread. + */ + public Optional getDeletePredicate(Schema tableSchema, + String path, + long dataSequenceNumber, + List deletes, + Optional startRowPosition, + Optional endRowPosition, + List readColumns, + TypeManager typeManager, + DeletePageSourceProvider deletePageSourceProvider) + { + if (deletes.isEmpty()) { + return Optional.empty(); + } + List deleteFilters = readDeletes( + tableSchema, + path, + deletes, + startRowPosition, + endRowPosition, + typeManager, + deletePageSourceProvider); + + return deleteFilters.stream() + .map(filter -> filter.createPredicate(readColumns, dataSequenceNumber)) + .reduce(EqualityDeleteFilter::combinePredicates); + } + + public interface DeletePageSourceProvider + { + ConnectorPageSource openDeletes(DeleteFile delete, + List deleteColumns, + TupleDomain tupleDomain); + } + + private List readDeletes( + Schema tableSchema, + String dataFilePath, + List deleteFiles, + Optional startRowPosition, + Optional endRowPosition, + TypeManager typeManager, + DeletePageSourceProvider deletePageSourceProvider) + { + verify(startRowPosition.isPresent() == endRowPosition.isPresent(), "startRowPosition and endRowPosition must be specified together"); + + Slice targetPath = utf8Slice(dataFilePath); + List filters = new ArrayList<>(); + LongBitmapDataProvider deletedRows = new Roaring64Bitmap(); + + IcebergColumnHandle deleteFilePath = getColumnHandle(DELETE_FILE_PATH, typeManager); + IcebergColumnHandle deleteFilePos = getColumnHandle(DELETE_FILE_POS, typeManager); + List deleteColumns = ImmutableList.of(deleteFilePath, deleteFilePos); + TupleDomain deleteDomain = TupleDomain.fromFixedValues(ImmutableMap.of(deleteFilePath, NullableValue.of(VARCHAR, targetPath))); + if (startRowPosition.isPresent()) { + Range positionRange = Range.range(deleteFilePos.getType(), startRowPosition.get(), true, endRowPosition.get(), true); + TupleDomain positionDomain = TupleDomain.withColumnDomains(ImmutableMap.of(deleteFilePos, Domain.create(ValueSet.ofRanges(positionRange), false))); + deleteDomain = deleteDomain.intersect(positionDomain); + } + + ArrayList filesToReprocess = new ArrayList<>(); + + for (DeleteFile delete : deleteFiles) { + if (delete.content() == POSITION_DELETES) { + loadPositionDeleteFile(startRowPosition, endRowPosition, deletePageSourceProvider, targetPath, deletedRows, deleteColumns, deleteDomain, delete); + } + else if (delete.content() == EQUALITY_DELETES) { + loadEqualityDeleteFile(delete, tableSchema, typeManager, deletePageSourceProvider, filesToReprocess, false); + } + else { + throw new VerifyException("Unknown delete content: " + delete.content()); + } + } + + for (DeleteFile delete : filesToReprocess) { + loadEqualityDeleteFile(delete, tableSchema, typeManager, deletePageSourceProvider, null, true); + } + + if (!deletedRows.isEmpty()) { + filters.add(new PositionDeleteFilter(deletedRows)); + } + equalityDeleteFiltersBySchema.forEach((k, f) -> filters.add(f)); + + return filters; + } + + private static void loadPositionDeleteFile(Optional startRowPosition, + Optional endRowPosition, + DeletePageSourceProvider deletePageSourceProvider, + Slice targetPath, + LongBitmapDataProvider deletedRows, + List deleteColumns, + TupleDomain deleteDomain, + DeleteFile delete) + { + if (startRowPosition.isPresent()) { + byte[] lowerBoundBytes = delete.getLowerBounds().get(DELETE_FILE_POS.fieldId()); + Optional positionLowerBound = Optional.ofNullable(lowerBoundBytes) + .map(bytes -> Conversions.fromByteBuffer(DELETE_FILE_POS.type(), ByteBuffer.wrap(bytes))); + + byte[] upperBoundBytes = delete.getUpperBounds().get(DELETE_FILE_POS.fieldId()); + Optional positionUpperBound = Optional.ofNullable(upperBoundBytes) + .map(bytes -> Conversions.fromByteBuffer(DELETE_FILE_POS.type(), ByteBuffer.wrap(bytes))); + + if ((positionLowerBound.isPresent() && positionLowerBound.get() > endRowPosition.get()) || + (positionUpperBound.isPresent() && positionUpperBound.get() < startRowPosition.get())) { + return; + } + } + + try (ConnectorPageSource pageSource = + deletePageSourceProvider.openDeletes(delete, deleteColumns, deleteDomain)) { + readPositionDeletes(pageSource, targetPath, deletedRows); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private void loadEqualityDeleteFile( + DeleteFile delete, + Schema tableSchema, + TypeManager typeManager, + DeletePageSourceProvider deletePageSourceProvider, + ArrayList filesToReprocess, + boolean forceWait) + { + List fieldIds = delete.equalityFieldIds(); + verify(!fieldIds.isEmpty(), "equality field IDs are missing"); + List columns = fieldIds.stream() + .map(id -> getColumnHandle(tableSchema.findField(id), typeManager)) + .collect(toImmutableList()); + var deleteSchema = schemaFromHandles(columns); + EqualityDeleteFilter equalityDeleteFilter = getFilterByDeleteSchema(deleteSchema, delete.equalityFieldIds()); + coordinateDeleteFileLoading(delete, deletePageSourceProvider, filesToReprocess, forceWait, equalityDeleteFilter, columns); + } + + /** + * Loads the delete file while taking into account other splits that may be loading it in parallel. + * A {@link io.trino.plugin.iceberg.delete.EqualityDeleteFilter.FileToken} represents a file being loaded. + * It may exist in multiple sequence numbers so if: + * - We're already loading the file in a newer sequence number, don't load the file here + * - We already loaded the file in an older sequence number, reload the file with the new sequence number + */ + private static void coordinateDeleteFileLoading(DeleteFile delete, + DeletePageSourceProvider deletePageSourceProvider, + ArrayList filesToReprocess, + boolean forceWait, + EqualityDeleteFilter equalityDeleteFilter, + List columns) + { + long deleteFileSequenceNumber = delete.getDataSequenceNumber(); + EqualityDeleteFilter.FileToken fileToken = equalityDeleteFilter.getLoadFileToken(delete.path(), deleteFileSequenceNumber); + boolean shouldLoadNow = fileToken.acquireLoading(); + if (!shouldLoadNow && !forceWait) { + // Files that are currently being loaded by another split, skip them and load another file, return to these files after loading other files to ensure they were loaded successfully + if (fileToken.shouldLoad(deleteFileSequenceNumber)) { + filesToReprocess.add(delete); + } + } + else { + synchronized (fileToken) { + if (fileToken.shouldLoad(deleteFileSequenceNumber)) { + fileToken.setDataVersion(deleteFileSequenceNumber); + try (ConnectorPageSource pageSource = deletePageSourceProvider.openDeletes(delete, columns, TupleDomain.all())) { + equalityDeleteFilter.loadEqualityDeletes(pageSource, columns, deleteFileSequenceNumber); + fileToken.setLoaded(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + finally { + fileToken.releaseLoading(); + } + } + } + } + } + + @NotNull + private EqualityDeleteFilter getFilterByDeleteSchema(Schema deleteSchema, List equalityDeleteFieldIds) + { + return equalityDeleteFiltersBySchema.computeIfAbsent(ImmutableSet.copyOf(equalityDeleteFieldIds), + (s) -> new EqualityDeleteFilter(deleteSchema)); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java index fbf5334de2b1..711a99d5ef68 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java @@ -15,53 +15,231 @@ import io.trino.plugin.iceberg.IcebergColumnHandle; import io.trino.spi.Page; +import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.type.Type; -import org.apache.iceberg.StructLike; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructLikeMap; +import org.apache.iceberg.util.StructProjection; +import java.util.HashMap; import java.util.List; -import java.util.function.Consumer; -import java.util.function.Predicate; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT; +import static io.trino.plugin.iceberg.IcebergUtil.schemaFromHandles; import static java.util.Objects.requireNonNull; public final class EqualityDeleteFilter implements DeleteFilter { - private final Predicate deletedRows; + /** + * Box class for data sequence number to avoid auto-boxing when adding values to the delete map + */ + private static class DataSequenceNumber + { + private long dataSequenceNumber; + + public DataSequenceNumber(long dataSequenceNumber) + { + this.dataSequenceNumber = dataSequenceNumber; + } + } - public EqualityDeleteFilter(Predicate deletedRows) + public static final class FileToken { - this.deletedRows = requireNonNull(deletedRows, "deletedRows is null"); + private long dataVersion; + private boolean isLoaded; + private boolean isLoading; + private final Object lockToken; + + private FileToken(long dataVersion) + { + this.dataVersion = dataVersion; + this.isLoaded = false; + this.isLoading = false; + this.lockToken = new Object(); + } + + public boolean shouldLoad(long deleteDataSequenceNumber) + { + synchronized (lockToken) { + return dataVersion < deleteDataSequenceNumber || (!isLoaded && dataVersion == deleteDataSequenceNumber); + } + } + + public boolean acquireLoading() + { + synchronized (lockToken) { + if (isLoading) { + return false; + } + else { + isLoading = true; + return true; + } + } + } + + public void setDataVersion(long newDataVersion) + { + synchronized (lockToken) { + this.dataVersion = newDataVersion; + } + } + + public void setLoaded() + { + synchronized (lockToken) { + this.isLoaded = true; + } + } + + public void releaseLoading() + { + synchronized (lockToken) { + isLoading = false; + } + } + } + + private final Schema deleteSchema; + private final StructLikeMap deleteMap; + private final ReentrantReadWriteLock readWriteLock; + private final HashMap loadedFiles; + + public EqualityDeleteFilter(Schema deleteSchema) + { + this.deleteSchema = requireNonNull(deleteSchema, "deleteSchema is null"); + this.deleteMap = StructLikeMap.create(deleteSchema.asStruct()); + this.loadedFiles = new HashMap<>(); + this.readWriteLock = new ReentrantReadWriteLock(); + } + + public FileToken getLoadFileToken(String fileName, long deleteFileSequenceNumber) + { + synchronized (loadedFiles) { + return loadedFiles.computeIfAbsent(fileName, k -> new FileToken(deleteFileSequenceNumber)); + } } @Override - public RowPredicate createPredicate(List columns) + public RowPredicate createPredicate(List columns, long splitDataSequenceNumber) { Type[] types = columns.stream() .map(IcebergColumnHandle::getType) .toArray(Type[]::new); - return (page, position) -> { - StructLike row = new LazyTrinoRow(types, page, position); - return !deletedRows.test(row); - }; + Schema fileSchema = schemaFromHandles(columns); + boolean hasRequiredColumns = columns.size() >= fileSchema.columns().size(); + for (Types.NestedField column : deleteSchema.columns()) { + hasRequiredColumns = hasRequiredColumns && fileSchema.findField(column.fieldId()) != null; + } + if (!hasRequiredColumns) { + // If we don't have all the required columns this delete filter can't be applied. + // The iceberg page source provider is responsible for making sure that we have all the required columns when a delete filter should be applied + throw new TrinoException(ICEBERG_CANNOT_OPEN_SPLIT, "columns list doesn't contain all equality delete columns"); + } + else { + StructProjection projection = StructProjection.create(fileSchema, deleteSchema); + + RowPredicate predicate = (page, position) -> { + StructProjection wrapped = projection.wrap(new LazyTrinoRow(types, page, position)); + DataSequenceNumber maxDeleteVersion = deleteMap.get(wrapped); + return maxDeleteVersion == null || maxDeleteVersion.dataSequenceNumber <= splitDataSequenceNumber; + }; + return new LockingRowPredicate(predicate, readWriteLock); + } } - public static void readEqualityDeletes(ConnectorPageSource pageSource, List columns, Consumer deletedRows) + public void loadEqualityDeletes(ConnectorPageSource pageSource, + List columns, + long deleteFileSequenceNumber) { Type[] types = columns.stream() .map(IcebergColumnHandle::getType) .toArray(Type[]::new); + DataSequenceNumber boxedDataSequenceNumber = new DataSequenceNumber(deleteFileSequenceNumber); while (!pageSource.isFinished()) { Page page = pageSource.getNextPage(); if (page == null) { continue; } - for (int position = 0; position < page.getPositionCount(); position++) { - deletedRows.accept(new TrinoRow(types, page, position)); + readWriteLock.writeLock().lock(); + try { + for (int position = 0; position < page.getPositionCount(); position++) { + TrinoRow key = new TrinoRow(types, page, position); + DataSequenceNumber existingSequenceNumber = deleteMap.put(key, boxedDataSequenceNumber); + if (existingSequenceNumber != null && existingSequenceNumber.dataSequenceNumber > deleteFileSequenceNumber) { + deleteMap.put(key, existingSequenceNumber); + } + } + } + finally { + readWriteLock.writeLock().unlock(); + } + } + } + + static RowPredicate combinePredicates(RowPredicate first, RowPredicate second) + { + if (first instanceof LockingRowPredicate) { + return combineLockingPredicate((LockingRowPredicate) first, second); + } + else if (second instanceof LockingRowPredicate) { + return combineLockingPredicate((LockingRowPredicate) second, first); + } + return (page, position) -> first.test(page, position) && second.test(page, position); + } + + private static RowPredicate combineLockingPredicate(LockingRowPredicate first, RowPredicate second) + { + return new LockingRowPredicate(combinePredicates(first.underlying, second), first.readWriteLock); + } + + private static class LockingRowPredicate + implements RowPredicate + { + private final RowPredicate underlying; + private final ReentrantReadWriteLock readWriteLock; + + private LockingRowPredicate(RowPredicate underlying, + ReentrantReadWriteLock readWriteLock) + { + this.underlying = underlying; + this.readWriteLock = readWriteLock; + } + + @Override + public boolean test(Page page, int position) + { + return underlying.test(page, position); + } + + @Override + public Page filterPage(Page page) + { + readWriteLock.readLock().lock(); + try { + int positionCount = page.getPositionCount(); + int[] retained = new int[positionCount]; + int retainedCount = 0; + for (int position = 0; position < positionCount; position++) { + if (test(page, position)) { + retained[retainedCount] = position; + retainedCount++; + } + } + if (retainedCount == positionCount) { + return page; + } + return page.getPositions(retained, 0, retainedCount); + } + finally { + readWriteLock.readLock().unlock(); } } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFilter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFilter.java index 952f215fb16b..d45e807e5aa8 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFilter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFilter.java @@ -39,7 +39,7 @@ public PositionDeleteFilter(ImmutableLongBitmapDataProvider deletedRows) } @Override - public RowPredicate createPredicate(List columns) + public RowPredicate createPredicate(List columns, long dataSequenceNumber) { int filePosChannel = rowPositionChannel(columns); return (page, position) -> { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/RowPredicate.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/RowPredicate.java index e02b4834bd4e..3f6c03b69b40 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/RowPredicate.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/RowPredicate.java @@ -15,18 +15,10 @@ import io.trino.spi.Page; -import static java.util.Objects.requireNonNull; - public interface RowPredicate { boolean test(Page page, int position); - default RowPredicate and(RowPredicate other) - { - requireNonNull(other, "other is null"); - return (page, position) -> test(page, position) && other.test(page, position); - } - default Page filterPage(Page page) { int positionCount = page.getPositionCount(); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/IcebergFunctionProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/IcebergFunctionProvider.java index dac202aacf5d..858de96fb885 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/IcebergFunctionProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/IcebergFunctionProvider.java @@ -14,12 +14,12 @@ package io.trino.plugin.iceberg.functions; import com.google.inject.Inject; -import io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionProcessorProvider; +import io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionProcessorProviderFactory; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionHandle; import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProvider; import io.trino.spi.function.FunctionProvider; import io.trino.spi.function.table.ConnectorTableFunctionHandle; -import io.trino.spi.function.table.TableFunctionProcessorProvider; +import io.trino.spi.function.table.TableFunctionProcessorProviderFactory; import static java.util.Objects.requireNonNull; @@ -35,10 +35,10 @@ public IcebergFunctionProvider(TableChangesFunctionProcessorProvider tableChange } @Override - public TableFunctionProcessorProvider getTableFunctionProcessorProvider(ConnectorTableFunctionHandle functionHandle) + public TableFunctionProcessorProviderFactory getTableFunctionProcessorProviderFactory(ConnectorTableFunctionHandle functionHandle) { if (functionHandle instanceof TableChangesFunctionHandle) { - return new ClassLoaderSafeTableFunctionProcessorProvider(tableChangesFunctionProcessorProvider, getClass().getClassLoader()); + return new ClassLoaderSafeTableFunctionProcessorProviderFactory(tableChangesFunctionProcessorProvider, getClass().getClassLoader()); } throw new UnsupportedOperationException("Unsupported function: " + functionHandle); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessor.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessor.java index d0b07741cb37..52d02049f6dc 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessor.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessor.java @@ -128,6 +128,7 @@ else if (column.getId() == DATA_CHANGE_ORDINAL_ID) { split.partitionDataJson(), split.fileFormat(), split.fileIoProperties(), + 0, functionHandle.nameMappingJson().map(NameMappingParser::fromJson)); this.delegateColumnMap = delegateColumnMap; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessorProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessorProvider.java index 1121046bff33..c07a3bc82fe7 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessorProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/functions/tablechanges/TableChangesFunctionProcessorProvider.java @@ -16,36 +16,53 @@ import com.google.inject.Inject; import io.trino.plugin.base.classloader.ClassLoaderSafeTableFunctionSplitProcessor; import io.trino.plugin.iceberg.IcebergPageSourceProvider; +import io.trino.plugin.iceberg.IcebergPageSourceProviderFactory; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.function.table.ConnectorTableFunctionHandle; import io.trino.spi.function.table.TableFunctionProcessorProvider; +import io.trino.spi.function.table.TableFunctionProcessorProviderFactory; import io.trino.spi.function.table.TableFunctionSplitProcessor; import static java.util.Objects.requireNonNull; public class TableChangesFunctionProcessorProvider - implements TableFunctionProcessorProvider + implements TableFunctionProcessorProviderFactory { - private final IcebergPageSourceProvider icebergPageSourceProvider; + private final IcebergPageSourceProviderFactory icebergPageSourceProviderFactory; @Inject - public TableChangesFunctionProcessorProvider(IcebergPageSourceProvider icebergPageSourceProvider) + public TableChangesFunctionProcessorProvider(IcebergPageSourceProviderFactory icebergPageSourceProviderFactory) { - this.icebergPageSourceProvider = requireNonNull(icebergPageSourceProvider, "icebergPageSourceProvider is null"); + this.icebergPageSourceProviderFactory = requireNonNull(icebergPageSourceProviderFactory, "icebergPageSourceProviderFactory is null"); } @Override - public TableFunctionSplitProcessor getSplitProcessor( - ConnectorSession session, - ConnectorTableFunctionHandle handle, - ConnectorSplit split) + public TableFunctionProcessorProvider createTableFunctionProcessorProvider() { - return new ClassLoaderSafeTableFunctionSplitProcessor(new TableChangesFunctionProcessor( - session, - (TableChangesFunctionHandle) handle, - (TableChangesSplit) split, - icebergPageSourceProvider), - getClass().getClassLoader()); + IcebergPageSourceProvider pageSourceProvider = (IcebergPageSourceProvider) icebergPageSourceProviderFactory.createPageSourceProvider(); + return new TableFunctionProcessorProviderInstance(pageSourceProvider); + } + + private static class TableFunctionProcessorProviderInstance + implements TableFunctionProcessorProvider + { + private final IcebergPageSourceProvider icebergPageSourceProvider; + + private TableFunctionProcessorProviderInstance(IcebergPageSourceProvider icebergPageSourceProvider) + { + this.icebergPageSourceProvider = icebergPageSourceProvider; + } + + @Override + public TableFunctionSplitProcessor getSplitProcessor(ConnectorSession session, ConnectorTableFunctionHandle handle, ConnectorSplit split) + { + return new ClassLoaderSafeTableFunctionSplitProcessor(new TableChangesFunctionProcessor( + session, + (TableChangesFunctionHandle) handle, + (TableChangesSplit) split, + icebergPageSourceProvider), + getClass().getClassLoader()); + } } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java index bcde49ee1f67..2938d8d40f92 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java @@ -142,6 +142,7 @@ public DistributedQueryRunner build() Path dataDir = metastoreDirectory.map(File::toPath).orElseGet(() -> queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data")); queryRunner.installPlugin(new TestingIcebergPlugin(dataDir)); queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties.buildOrThrow()); + schemaInitializer.orElseGet(() -> SchemaInitializer.builder().build()).accept(queryRunner); return queryRunner; diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java index 1e96f0ab9d68..157ccda3f097 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java @@ -27,9 +27,20 @@ import io.trino.orc.metadata.statistics.StripeStatistics; import io.trino.parquet.ParquetReaderOptions; import io.trino.parquet.reader.MetadataReader; +import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.FileFormatDataSourceStats; +import io.trino.plugin.hive.TrinoViewHiveMetastore; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; import io.trino.plugin.hive.parquet.TrinoParquetDataSource; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.TestingTypeManager; import io.trino.testing.QueryRunner; +import org.apache.iceberg.BaseTable; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -44,7 +55,10 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterators.getOnlyElement; import static com.google.common.collect.MoreCollectors.onlyElement; +import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.createPerTransactionCache; import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; +import static io.trino.testing.TestingConnectorSession.SESSION; public final class IcebergTestUtils { @@ -145,4 +159,26 @@ public static TrinoFileSystemFactory getFileSystemFactory(QueryRunner queryRunne return ((IcebergConnector) queryRunner.getCoordinator().getConnector(ICEBERG_CATALOG)) .getInjector().getInstance(TrinoFileSystemFactory.class); } + + public static BaseTable loadTable(String tableName, + HiveMetastore metastore, + TrinoFileSystemFactory fileSystemFactory, + String catalogName, + String schemaName) + { + IcebergTableOperationsProvider tableOperationsProvider = new FileMetastoreTableOperationsProvider(fileSystemFactory); + CachingHiveMetastore cachingHiveMetastore = createPerTransactionCache(metastore, 1000); + TrinoCatalog catalog = new TrinoHiveCatalog( + new CatalogName(catalogName), + cachingHiveMetastore, + new TrinoViewHiveMetastore(cachingHiveMetastore, false, "trino-version", "test"), + fileSystemFactory, + new TestingTypeManager(), + tableOperationsProvider, + false, + false, + false, + new IcebergConfig().isHideMaterializedViewStorageTable()); + return (BaseTable) loadIcebergTable(catalog, tableOperationsProvider, SESSION, new SchemaTableName(schemaName, tableName)); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAlluxioCacheFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAlluxioCacheFileOperations.java index d1ab8f4483db..b73a4998a65c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAlluxioCacheFileOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAlluxioCacheFileOperations.java @@ -18,7 +18,7 @@ import com.google.common.collect.ImmutableMultiset; import com.google.common.collect.Multiset; import io.opentelemetry.api.common.Attributes; -import io.trino.plugin.iceberg.TestIcebergFileOperations.FileType; +import io.trino.plugin.iceberg.util.FileOperationUtils; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; import org.intellij.lang.annotations.Language; @@ -32,7 +32,7 @@ import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_LOCATION; import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; -import static io.trino.plugin.iceberg.TestIcebergFileOperations.FileType.DATA; +import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.DATA; import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toCollection; @@ -127,12 +127,12 @@ private Multiset getCacheOperations() .collect(toCollection(HashMultiset::create)); } - private record CacheOperation(String operationName, FileType fileType) + private record CacheOperation(String operationName, FileOperationUtils.FileType fileType) { public static CacheOperation create(String operationName, Attributes attributes) { String path = requireNonNull(attributes.get(CACHE_FILE_LOCATION)); - return new CacheOperation(operationName, FileType.fromFilePath(path)); + return new CacheOperation(operationName, FileOperationUtils.FileType.fromFilePath(path)); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java index e86fa98936ba..7a047f54ee1e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java @@ -13,19 +13,24 @@ */ package io.trino.plugin.iceberg; -import com.google.common.collect.HashMultiset; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultiset; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multiset; -import io.opentelemetry.sdk.trace.data.SpanData; import io.trino.Session; import io.trino.SystemSessionProperties; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.HiveMetastoreFactory; +import io.trino.plugin.iceberg.util.FileOperationUtils; import io.trino.plugin.tpch.TpchPlugin; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.QueryRunner; +import org.apache.iceberg.Table; import org.apache.iceberg.util.ThreadPools; import org.intellij.lang.annotations.Language; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; @@ -33,30 +38,29 @@ import org.junit.jupiter.params.provider.MethodSource; import java.nio.file.Path; -import java.util.List; -import java.util.function.Predicate; +import java.util.Optional; import static com.google.common.collect.ImmutableMultiset.toImmutableMultiset; import static io.trino.SystemSessionProperties.MIN_INPUT_SIZE_PER_TASK; -import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_LOCATION; import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; import static io.trino.plugin.iceberg.IcebergSessionProperties.COLLECT_EXTENDED_STATISTICS_ON_WRITE; -import static io.trino.plugin.iceberg.TestIcebergFileOperations.FileType.DATA; -import static io.trino.plugin.iceberg.TestIcebergFileOperations.FileType.MANIFEST; -import static io.trino.plugin.iceberg.TestIcebergFileOperations.FileType.METADATA_JSON; -import static io.trino.plugin.iceberg.TestIcebergFileOperations.FileType.METASTORE; -import static io.trino.plugin.iceberg.TestIcebergFileOperations.FileType.SNAPSHOT; -import static io.trino.plugin.iceberg.TestIcebergFileOperations.FileType.STATS; -import static io.trino.plugin.iceberg.TestIcebergFileOperations.FileType.fromFilePath; -import static io.trino.plugin.iceberg.TestIcebergFileOperations.Scope.ALL_FILES; -import static io.trino.plugin.iceberg.TestIcebergFileOperations.Scope.METADATA_FILES; +import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory; +import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTable; +import static io.trino.plugin.iceberg.util.FileOperationUtils.FileOperation; +import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.DATA; +import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.DELETE; +import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.MANIFEST; +import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.METADATA_JSON; +import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.SNAPSHOT; +import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.STATS; +import static io.trino.plugin.iceberg.util.FileOperationUtils.Scope.ALL_FILES; +import static io.trino.plugin.iceberg.util.FileOperationUtils.Scope.METADATA_FILES; import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.testing.TestingSession.testSessionBuilder; import static java.lang.Math.min; import static java.lang.String.format; -import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.toCollection; +import static org.assertj.core.api.Assertions.assertThat; @Execution(ExecutionMode.SAME_THREAD) public class TestIcebergFileOperations @@ -64,6 +68,9 @@ public class TestIcebergFileOperations { private static final int MAX_PREFIXES_COUNT = 10; + private HiveMetastore metastore; + private TrinoFileSystemFactory fileSystemFactory; + @Override protected QueryRunner createQueryRunner() throws Exception @@ -90,6 +97,11 @@ protected QueryRunner createQueryRunner() queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", ImmutableMap.builder() .put("iceberg.split-manager-threads", "0") .buildOrThrow()); + + metastore = ((IcebergConnector) queryRunner.getCoordinator().getConnector(ICEBERG_CATALOG)).getInjector() + .getInstance(HiveMetastoreFactory.class) + .createMetastore(Optional.empty()); + queryRunner.installPlugin(new TpchPlugin()); queryRunner.createCatalog("tpch", "tpch"); @@ -98,6 +110,12 @@ protected QueryRunner createQueryRunner() return queryRunner; } + @BeforeAll + public void initFileSystemFactory() + { + fileSystemFactory = getFileSystemFactory(getDistributedQueryRunner()); + } + @Test public void testCreateTable() { @@ -655,13 +673,13 @@ public void testInformationSchemaColumns(int tables) // Pointed lookup assertFileSystemAccesses(session, "SELECT * FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA AND table_name = 'test_select_i_s_columns0'", ImmutableMultiset.builder() - .add(new FileOperation(FileType.METADATA_JSON, "InputFile.newStream")) + .add(new FileOperation(METADATA_JSON, "InputFile.newStream")) .build()); // Pointed lookup via DESCRIBE (which does some additional things before delegating to information_schema.columns) assertFileSystemAccesses(session, "DESCRIBE test_select_i_s_columns0", ImmutableMultiset.builder() - .add(new FileOperation(FileType.METADATA_JSON, "InputFile.newStream")) + .add(new FileOperation(METADATA_JSON, "InputFile.newStream")) .build()); for (int i = 0; i < tables; i++) { @@ -778,6 +796,43 @@ public void testSystemMetadataMaterializedViews() assertUpdate("DROP SCHEMA " + schemaName + " CASCADE"); } + @Test + public void testV2TableEnsureEqualityDeleteFilesAreReadOnce() + throws Exception + { + String tableName = "test_equality_deletes_ensure_delete_read_count" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (id INT, age INT)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 20), (3, 30)", 2); + // change the schema and do another insert to force at least 2 splits + // use the same ID in both files so the delete file doesn't get optimized away by statstics + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 22)", 1); + Table icebergTable = IcebergTestUtils.loadTable(tableName, metastore, fileSystemFactory, "iceberg", "test_schema"); + + // Delete only 1 row in the file so the data file is not pruned completely + writeEqualityDeleteForTable(icebergTable, + fileSystemFactory, + Optional.of(icebergTable.spec()), + Optional.empty(), + ImmutableMap.of("id", 2), + Optional.empty()); + + ImmutableMultiset expectedAccesses = ImmutableMultiset.builder() + .addCopies(new FileOperationUtils.FileOperation(DATA, "InputFile.newInput"), 2) + .addCopies(new FileOperationUtils.FileOperation(DELETE, "InputFile.newInput"), 1) + .build(); + + QueryRunner.MaterializedResultWithPlan queryResult = getDistributedQueryRunner().executeWithPlan(getSession(), "SELECT * FROM " + tableName); + assertThat(queryResult.result().getRowCount()) + .describedAs("query result row count") + .isEqualTo(1); + assertMultisetsEqual( + FileOperationUtils.getOperations(getDistributedQueryRunner().getSpans()).stream() + .filter(operation -> ImmutableSet.of(DATA, DELETE).contains(operation.fileType())) + .collect(toImmutableMultiset()), + expectedAccesses); + assertUpdate("DROP TABLE " + tableName); + } + @Test public void testShowTables() { @@ -789,7 +844,7 @@ private void assertFileSystemAccesses(@Language("SQL") String query, Multiset expectedAccesses) + private void assertFileSystemAccesses(@Language("SQL") String query, FileOperationUtils.Scope scope, Multiset expectedAccesses) { assertFileSystemAccesses(getSession(), query, scope, expectedAccesses); } @@ -799,24 +854,16 @@ private void assertFileSystemAccesses(Session session, @Language("SQL") String q assertFileSystemAccesses(session, query, METADATA_FILES, expectedAccesses); } - private synchronized void assertFileSystemAccesses(Session session, @Language("SQL") String query, Scope scope, Multiset expectedAccesses) + private synchronized void assertFileSystemAccesses(Session session, @Language("SQL") String query, FileOperationUtils.Scope scope, Multiset expectedAccesses) { getDistributedQueryRunner().executeWithPlan(session, query); assertMultisetsEqual( - getOperations(getDistributedQueryRunner().getSpans()).stream() + FileOperationUtils.getOperations(getDistributedQueryRunner().getSpans()).stream() .filter(scope) .collect(toImmutableMultiset()), expectedAccesses); } - private Multiset getOperations(List spans) - { - return spans.stream() - .filter(span -> span.getName().startsWith("InputFile.") || span.getName().startsWith("OutputFile.")) - .map(span -> new FileOperation(fromFilePath(span.getAttributes().get(FILE_LOCATION)), span.getName())) - .collect(toCollection(HashMultiset::create)); - } - private long getLatestSnapshotId(String tableName) { return (long) computeScalar(format("SELECT snapshot_id FROM \"%s$snapshots\" ORDER BY committed_at DESC FETCH FIRST 1 ROW WITH TIES", tableName)); @@ -829,66 +876,4 @@ private static Session withStatsOnWrite(Session session, boolean enabled) .setCatalogSessionProperty(catalog, COLLECT_EXTENDED_STATISTICS_ON_WRITE, Boolean.toString(enabled)) .build(); } - - private record FileOperation(FileType fileType, String operationType) - { - public FileOperation - { - requireNonNull(fileType, "fileType is null"); - requireNonNull(operationType, "operationType is null"); - } - } - - enum Scope - implements Predicate - { - METADATA_FILES { - @Override - public boolean test(FileOperation fileOperation) - { - return fileOperation.fileType() != DATA && fileOperation.fileType() != METASTORE; - } - }, - ALL_FILES { - @Override - public boolean test(FileOperation fileOperation) - { - return fileOperation.fileType() != METASTORE; - } - }, - } - - enum FileType - { - METADATA_JSON, - SNAPSHOT, - MANIFEST, - STATS, - DATA, - METASTORE, - /**/; - - public static FileType fromFilePath(String path) - { - if (path.endsWith("metadata.json")) { - return METADATA_JSON; - } - if (path.contains("/snap-")) { - return SNAPSHOT; - } - if (path.endsWith("-m0.avro")) { - return MANIFEST; - } - if (path.endsWith(".stats")) { - return STATS; - } - if (path.contains("/data/") && (path.endsWith(".orc") || path.endsWith(".parquet"))) { - return DATA; - } - if (path.endsWith(".trinoSchema") || path.contains("/.trinoPermissions/")) { - return METASTORE; - } - throw new IllegalArgumentException("File not recognized: " + path); - } - } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java index 3d86e26b3ba3..19a366568df7 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergNodeLocalDynamicSplitPruning.java @@ -147,7 +147,8 @@ public void testDynamicSplitPruningOnUnpartitionedTable() PartitionData.toJson(new PartitionData(new Object[] {})), ImmutableList.of(), SplitWeight.standard(), - ImmutableMap.of()); + ImmutableMap.of(), + 0); String tablePath = inputFile.location().fileName(); TableHandle tableHandle = new TableHandle( @@ -263,7 +264,8 @@ public void testDynamicSplitPruningWithExplicitPartitionFilter() PartitionData.toJson(new PartitionData(new Object[] {dateColumnValue})), ImmutableList.of(), SplitWeight.standard(), - ImmutableMap.of()); + ImmutableMap.of(), + 0); String tablePath = inputFile.location().fileName(); TableHandle tableHandle = new TableHandle( @@ -413,7 +415,8 @@ public void testDynamicSplitPruningWithExplicitPartitionFilterPartitionEvolution PartitionData.toJson(new PartitionData(new Object[] {yearColumnValue})), ImmutableList.of(), SplitWeight.standard(), - ImmutableMap.of()); + ImmutableMap.of(), + 0); String tablePath = inputFile.location().fileName(); // Simulate the situation where `month` column is added at a later phase as partitioning column @@ -512,20 +515,22 @@ private static ConnectorPageSource createTestingPageSource( DynamicFilter dynamicFilter) { FileFormatDataSourceStats stats = new FileFormatDataSourceStats(); - IcebergPageSourceProvider provider = new IcebergPageSourceProvider( + + try (IcebergPageSourceProviderFactory providerFactory = new IcebergPageSourceProviderFactory( new DefaultIcebergFileSystemFactory(new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS)), stats, ORC_READER_CONFIG, PARQUET_READER_CONFIG, - TESTING_TYPE_MANAGER); - - return provider.createPageSource( - transaction, - getSession(icebergConfig), - split, - tableHandle.getConnectorHandle(), - columns, - dynamicFilter); + TESTING_TYPE_MANAGER)) { + var provider = providerFactory.createPageSourceProvider(); + return provider.createPageSource( + transaction, + getSession(icebergConfig), + split, + tableHandle.getConnectorHandle(), + columns, + dynamicFilter); + } } private static TestingConnectorSession getSession(IcebergConfig icebergConfig) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index d89aef8d1a57..ad765de4f952 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -17,19 +17,11 @@ import com.google.common.collect.ImmutableMap; import io.trino.Session; import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.plugin.base.CatalogName; import io.trino.plugin.base.util.Closables; import io.trino.plugin.blackhole.BlackHolePlugin; -import io.trino.plugin.hive.TrinoViewHiveMetastore; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.HiveMetastoreFactory; -import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; -import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; -import io.trino.plugin.iceberg.catalog.TrinoCatalog; -import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; -import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog; import io.trino.plugin.iceberg.fileio.ForwardingFileIo; -import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.Range; import io.trino.spi.predicate.TupleDomain; @@ -55,11 +47,11 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetWriter; -import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -81,10 +73,10 @@ import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; -import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.createPerTransactionCache; import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory; -import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; +import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTable; +import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTableWithSchema; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.testing.TestingConnectorSession.SESSION; import static io.trino.testing.TestingNames.randomNameSuffix; @@ -309,6 +301,38 @@ public void testMultipleEqualityDeletes() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testEqualityDeleteAppliesOnlyToCorrectDataVersion() + throws Exception + { + String tableName = "test_multiple_equality_deletes_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25); + Table icebergTable = loadTable(tableName); + assertThat(icebergTable.currentSnapshot().summary().get("total-equality-deletes")).isEqualTo("0"); + + for (int i = 1; i < 3; i++) { + writeEqualityDeleteToNationTable( + icebergTable, + Optional.empty(), + Optional.empty(), + ImmutableMap.of("regionkey", Integer.toUnsignedLong(i))); + } + + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE (regionkey != 1L AND regionkey != 2L)"); + + // Reinsert the data for regionkey = 1. This should insert the data with a larger datasequence number and the delete file should not apply to it anymore. + // Also delete something again so that the split has deletes and the delete logic is activated. + assertUpdate("INSERT INTO " + tableName + " SELECT * FROM tpch.tiny.nation WHERE regionkey = 1", 5); + writeEqualityDeleteToNationTable( + icebergTable, + Optional.empty(), + Optional.empty(), + ImmutableMap.of("regionkey", Integer.toUnsignedLong(3))); + + assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation WHERE (regionkey != 2L AND regionkey != 3L)"); + assertUpdate("DROP TABLE " + tableName); + } + @Test public void testMultipleEqualityDeletesWithEquivalentSchemas() throws Exception @@ -376,6 +400,34 @@ public void testMultipleEqualityDeletesWithDifferentSchemas() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testEqualityDeletesAcrossPartitions() + throws Exception + { + String tableName = "test_equality_deletes_across_partitions_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['partition']) AS SELECT 'part_1' as partition, * FROM tpch.tiny.nation", 25); + assertUpdate("INSERT INTO " + tableName + " SELECT 'part_2' as partition, * FROM tpch.tiny.nation", 25); + Table icebergTable = loadTable(tableName); + PartitionData partitionData1 = PartitionData.fromJson("{\"partitionValues\":[\"part_1\"]}", new Type[] {Types.StringType.get()}); + PartitionData partitionData2 = PartitionData.fromJson("{\"partitionValues\":[\"part_2\"]}", new Type[] {Types.StringType.get()}); + writeEqualityDeleteToNationTableWithDeleteColumns( + icebergTable, + Optional.of(icebergTable.spec()), + Optional.of(partitionData1), + ImmutableMap.of("regionkey", 1L), + Optional.of(ImmutableList.of("regionkey"))); + // Delete from both partitions so internal code doesn't skip all deletion logic for second partition invalidating this test + writeEqualityDeleteToNationTableWithDeleteColumns( + icebergTable, + Optional.of(icebergTable.spec()), + Optional.of(partitionData2), + ImmutableMap.of("regionkey", 2L), + Optional.of(ImmutableList.of("regionkey"))); + + assertQuery("SELECT * FROM " + tableName, "SELECT 'part_1', * FROM nation WHERE regionkey <> 1 UNION ALL select 'part_2', * FROM NATION where regionkey <> 2"); + assertUpdate("DROP TABLE " + tableName); + } + @Test public void testMultipleEqualityDeletesWithNestedFields() throws Exception @@ -949,12 +1001,7 @@ private void writeEqualityDeleteToNationTableWithDeleteColumns( Optional> deleteFileColumns) throws Exception { - List deleteColumns = deleteFileColumns.orElse(new ArrayList<>(overwriteValues.keySet())); - Schema deleteRowSchema = icebergTable.schema().select(deleteColumns); - List equalityDeleteFieldIds = deleteColumns.stream() - .map(name -> deleteRowSchema.findField(name).fieldId()) - .collect(toImmutableList()); - writeEqualityDeleteToNationTableWithDeleteColumns(icebergTable, partitionSpec, partitionData, overwriteValues, deleteRowSchema, equalityDeleteFieldIds); + writeEqualityDeleteForTable(icebergTable, fileSystemFactory, partitionSpec, partitionData, overwriteValues, deleteFileColumns); } private void writeEqualityDeleteToNationTableWithDeleteColumns( @@ -966,27 +1013,7 @@ private void writeEqualityDeleteToNationTableWithDeleteColumns( List equalityDeleteFieldIds) throws Exception { - FileIO fileIo = new ForwardingFileIo(fileSystemFactory.create(SESSION)); - - Parquet.DeleteWriteBuilder writerBuilder = Parquet.writeDeletes(fileIo.newOutputFile("local:///delete_file_" + UUID.randomUUID())) - .forTable(icebergTable) - .rowSchema(deleteRowSchema) - .createWriterFunc(GenericParquetWriter::buildWriter) - .equalityFieldIds(equalityDeleteFieldIds) - .overwrite(); - if (partitionSpec.isPresent() && partitionData.isPresent()) { - writerBuilder = writerBuilder - .withSpec(partitionSpec.get()) - .withPartition(partitionData.get()); - } - EqualityDeleteWriter writer = writerBuilder.buildEqualityWriter(); - - Record dataDelete = GenericRecord.create(deleteRowSchema); - try (Closeable ignored = writer) { - writer.write(dataDelete.copy(overwriteValues)); - } - - icebergTable.newRowDelta().addDeletes(writer.toDeleteFile()).commit(); + writeEqualityDeleteForTableWithSchema(icebergTable, fileSystemFactory, partitionSpec, partitionData, deleteRowSchema, equalityDeleteFieldIds, overwriteValues); } private Table updateTableToV2(String tableName) @@ -1001,20 +1028,7 @@ private Table updateTableToV2(String tableName) private BaseTable loadTable(String tableName) { - IcebergTableOperationsProvider tableOperationsProvider = new FileMetastoreTableOperationsProvider(fileSystemFactory); - CachingHiveMetastore cachingHiveMetastore = createPerTransactionCache(metastore, 1000); - TrinoCatalog catalog = new TrinoHiveCatalog( - new CatalogName("hive"), - cachingHiveMetastore, - new TrinoViewHiveMetastore(cachingHiveMetastore, false, "trino-version", "test"), - fileSystemFactory, - new TestingTypeManager(), - tableOperationsProvider, - false, - false, - false, - new IcebergConfig().isHideMaterializedViewStorageTable()); - return (BaseTable) loadIcebergTable(catalog, tableOperationsProvider, SESSION, new SchemaTableName("tpch", tableName)); + return IcebergTestUtils.loadTable(tableName, metastore, fileSystemFactory, "hive", "tpch"); } private List getActiveFiles(String tableName) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java index 9b3275d12393..ca97dc150a32 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java @@ -24,6 +24,7 @@ import io.trino.plugin.iceberg.IcebergConnector; import io.trino.plugin.iceberg.TableType; import io.trino.plugin.iceberg.TestingIcebergPlugin; +import io.trino.plugin.iceberg.util.FileOperationUtils.FileOperation; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; import io.trino.testing.QueryRunner; @@ -59,8 +60,8 @@ import static io.trino.plugin.iceberg.catalog.glue.GlueMetastoreMethod.GET_TABLE; import static io.trino.plugin.iceberg.catalog.glue.GlueMetastoreMethod.GET_TABLES; import static io.trino.plugin.iceberg.catalog.glue.GlueMetastoreMethod.UPDATE_TABLE; -import static io.trino.plugin.iceberg.catalog.glue.TestIcebergGlueCatalogAccessOperations.FileType.METADATA_JSON; -import static io.trino.plugin.iceberg.catalog.glue.TestIcebergGlueCatalogAccessOperations.FileType.fromFilePath; +import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.METADATA_JSON; +import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.fromFilePath; import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.testing.TestingSession.testSessionBuilder; @@ -707,43 +708,4 @@ private static Session withStatsOnWrite(Session session, boolean enabled) .setCatalogSessionProperty(catalog, COLLECT_EXTENDED_STATISTICS_ON_WRITE, Boolean.toString(enabled)) .build(); } - - private record FileOperation(FileType fileType, String operationType) - { - public FileOperation - { - requireNonNull(fileType, "fileType is null"); - requireNonNull(operationType, "operationType is null"); - } - } - - enum FileType - { - METADATA_JSON, - SNAPSHOT, - MANIFEST, - STATS, - DATA, - /**/; - - public static FileType fromFilePath(String path) - { - if (path.endsWith("metadata.json")) { - return METADATA_JSON; - } - if (path.contains("/snap-")) { - return SNAPSHOT; - } - if (path.endsWith("-m0.avro")) { - return MANIFEST; - } - if (path.endsWith(".stats")) { - return STATS; - } - if (path.contains("/data/") && (path.endsWith(".orc") || path.endsWith(".parquet"))) { - return DATA; - } - throw new IllegalArgumentException("File not recognized: " + path); - } - } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/EqualityDeleteUtils.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/EqualityDeleteUtils.java new file mode 100644 index 000000000000..af5554e1b495 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/EqualityDeleteUtils.java @@ -0,0 +1,94 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.util; + +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.plugin.iceberg.PartitionData; +import io.trino.plugin.iceberg.fileio.ForwardingFileIo; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.parquet.Parquet; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.testing.TestingConnectorSession.SESSION; + +public final class EqualityDeleteUtils +{ + private EqualityDeleteUtils() + { + } + + public static void writeEqualityDeleteForTable( + Table icebergTable, + TrinoFileSystemFactory fileSystemFactory, + Optional partitionSpec, + Optional partitionData, + Map overwriteValues, + Optional> deleteFileColumns) + throws Exception + { + List deleteColumns = deleteFileColumns.orElse(new ArrayList<>(overwriteValues.keySet())); + Schema deleteRowSchema = icebergTable.schema().select(deleteColumns); + List equalityDeleteFieldIds = deleteColumns.stream() + .map(name -> deleteRowSchema.findField(name).fieldId()) + .collect(toImmutableList()); + writeEqualityDeleteForTableWithSchema(icebergTable, fileSystemFactory, partitionSpec, partitionData, deleteRowSchema, equalityDeleteFieldIds, overwriteValues); + } + + public static void writeEqualityDeleteForTableWithSchema(Table icebergTable, + TrinoFileSystemFactory fileSystemFactory, + Optional partitionSpec, + Optional partitionData, + Schema deleteRowSchema, + List equalityDeleteFieldIds, + Map overwriteValues) + throws Exception + { + String deleteFileName = "local:///delete_file_" + UUID.randomUUID() + ".parquet"; + FileIO fileIo = new ForwardingFileIo(fileSystemFactory.create(SESSION)); + + Parquet.DeleteWriteBuilder writerBuilder = Parquet.writeDeletes(fileIo.newOutputFile(deleteFileName)) + .forTable(icebergTable) + .rowSchema(deleteRowSchema) + .createWriterFunc(GenericParquetWriter::buildWriter) + .equalityFieldIds(equalityDeleteFieldIds) + .overwrite(); + if (partitionSpec.isPresent() && partitionData.isPresent()) { + writerBuilder = writerBuilder + .withSpec(partitionSpec.get()) + .withPartition(partitionData.get()); + } + EqualityDeleteWriter writer = writerBuilder.buildEqualityWriter(); + + Record dataDelete = GenericRecord.create(deleteRowSchema); + try (Closeable ignored = writer) { + writer.write(dataDelete.copy(overwriteValues)); + } + + icebergTable.newRowDelta().addDeletes(writer.toDeleteFile()).commit(); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/FileOperationUtils.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/FileOperationUtils.java new file mode 100644 index 000000000000..f90bd9ada3be --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/FileOperationUtils.java @@ -0,0 +1,109 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.util; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; +import io.opentelemetry.sdk.trace.data.SpanData; + +import java.util.List; +import java.util.function.Predicate; + +import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_LOCATION; +import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.DATA; +import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.METASTORE; +import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.fromFilePath; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toCollection; + +public class FileOperationUtils +{ + private FileOperationUtils() + { + } + + public static Multiset getOperations(List spans) + { + return spans.stream() + .filter(span -> span.getName().startsWith("InputFile.") || span.getName().startsWith("OutputFile.")) + .map(span -> new FileOperation(fromFilePath(span.getAttributes().get(FILE_LOCATION)), span.getName())) + .collect(toCollection(HashMultiset::create)); + } + + public record FileOperation(FileType fileType, String operationType) + { + public FileOperation + { + requireNonNull(fileType, "fileType is null"); + requireNonNull(operationType, "operationType is null"); + } + } + + public enum Scope + implements Predicate + { + METADATA_FILES { + @Override + public boolean test(FileOperation fileOperation) + { + return fileOperation.fileType() != DATA && fileOperation.fileType() != METASTORE; + } + }, + ALL_FILES { + @Override + public boolean test(FileOperation fileOperation) + { + return fileOperation.fileType() != METASTORE; + } + }, + } + + public enum FileType + { + METADATA_JSON, + SNAPSHOT, + MANIFEST, + STATS, + DATA, + DELETE, + METASTORE, + /**/; + + public static FileType fromFilePath(String path) + { + if (path.endsWith("metadata.json")) { + return METADATA_JSON; + } + if (path.contains("/snap-")) { + return SNAPSHOT; + } + if (path.endsWith("-m0.avro")) { + return MANIFEST; + } + if (path.endsWith(".stats")) { + return STATS; + } + if (path.contains("/data/") && (path.endsWith(".orc") || path.endsWith(".parquet"))) { + return DATA; + } + if (path.contains("delete_file") && (path.endsWith(".orc") || path.endsWith(".parquet"))) { + return DELETE; + } + if (path.endsWith(".trinoSchema") || path.contains("/.trinoPermissions/")) { + return METASTORE; + } + throw new IllegalArgumentException("File not recognized: " + path); + } + } +}