diff --git a/core/trino-main/src/main/java/io/trino/connector/CatalogPruneTask.java b/core/trino-main/src/main/java/io/trino/connector/CatalogPruneTask.java index 0dccf39a35ce..a9930d864712 100644 --- a/core/trino-main/src/main/java/io/trino/connector/CatalogPruneTask.java +++ b/core/trino-main/src/main/java/io/trino/connector/CatalogPruneTask.java @@ -29,6 +29,7 @@ import io.airlift.log.Logger; import io.airlift.node.NodeInfo; import io.airlift.units.Duration; +import io.trino.metadata.CatalogManager; import io.trino.metadata.ForNodeManager; import io.trino.server.InternalCommunicationConfig; import io.trino.spi.connector.CatalogHandle; @@ -61,7 +62,8 @@ public class CatalogPruneTask private static final JsonCodec> CATALOG_HANDLES_CODEC = listJsonCodec(CatalogHandle.class); private final TransactionManager transactionManager; - private final CoordinatorDynamicCatalogManager catalogManager; + private final CatalogManager catalogManager; + private final ConnectorServicesProvider connectorServicesProvider; private final NodeInfo nodeInfo; private final ServiceSelector selector; private final HttpClient httpClient; @@ -78,7 +80,8 @@ public class CatalogPruneTask @Inject public CatalogPruneTask( TransactionManager transactionManager, - CoordinatorDynamicCatalogManager catalogManager, + CatalogManager catalogManager, + ConnectorServicesProvider connectorServicesProvider, NodeInfo nodeInfo, @ServiceType("trino") ServiceSelector selector, @ForNodeManager HttpClient httpClient, @@ -87,6 +90,7 @@ public CatalogPruneTask( { this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); this.catalogManager = requireNonNull(catalogManager, "catalogManager is null"); + this.connectorServicesProvider = requireNonNull(connectorServicesProvider, "connectorServicesProvider is null"); this.nodeInfo = requireNonNull(nodeInfo, "nodeInfo is null"); this.selector = requireNonNull(selector, "selector is null"); this.httpClient = requireNonNull(httpClient, "httpClient is null"); @@ -127,7 +131,7 @@ public ThreadPoolExecutorMBean getExecutor() } @VisibleForTesting - void pruneWorkerCatalogs() + public void pruneWorkerCatalogs() { Set online = selector.selectAllServices().stream() .filter(descriptor -> !nodeInfo.getNodeId().equals(descriptor.getNodeId())) @@ -135,6 +139,14 @@ void pruneWorkerCatalogs() // send message to workers to trigger prune List activeCatalogs = getActiveCatalogs(); + pruneWorkerCatalogs(online, activeCatalogs); + + // prune all inactive catalogs - we pass an empty set here because manager always retains active catalogs + connectorServicesProvider.pruneCatalogs(ImmutableSet.of()); + } + + void pruneWorkerCatalogs(Set online, List activeCatalogs) + { for (ServiceDescriptor service : online) { URI uri = getHttpUri(service); if (uri == null) { @@ -163,9 +175,6 @@ public Object handle(Request request, Response response) } }); } - - // prune all inactive catalogs - we pass an empty set here because manager always retains active catalogs - catalogManager.pruneCatalogs(ImmutableSet.of()); } private List getActiveCatalogs() diff --git a/core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java b/core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java index 68cef226315e..647c51909c8c 100644 --- a/core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java +++ b/core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java @@ -166,6 +166,7 @@ public Optional getCatalog(String catalogName) return Optional.ofNullable(activeCatalogs.get(catalogName)); } + @Override public Set getActiveCatalogs() { return activeCatalogs.values().stream() diff --git a/core/trino-main/src/main/java/io/trino/connector/StaticCatalogManager.java b/core/trino-main/src/main/java/io/trino/connector/StaticCatalogManager.java index 3e0277002b0d..9e55435cae73 100644 --- a/core/trino-main/src/main/java/io/trino/connector/StaticCatalogManager.java +++ b/core/trino-main/src/main/java/io/trino/connector/StaticCatalogManager.java @@ -200,6 +200,13 @@ public Optional getCatalogProperties(CatalogHandle catalogHan return Optional.empty(); } + @Override + public Set getActiveCatalogs() + { + // static catalog manager does not differentiate between active and not. Nor does it need to prune + return ImmutableSet.of(); + } + @Override public ConnectorServices getConnectorServices(CatalogHandle catalogHandle) { diff --git a/core/trino-main/src/main/java/io/trino/connector/WorkerDynamicCatalogManager.java b/core/trino-main/src/main/java/io/trino/connector/WorkerDynamicCatalogManager.java index 7403f3d2d14b..5dc7be99dcca 100644 --- a/core/trino-main/src/main/java/io/trino/connector/WorkerDynamicCatalogManager.java +++ b/core/trino-main/src/main/java/io/trino/connector/WorkerDynamicCatalogManager.java @@ -101,7 +101,7 @@ public void ensureCatalogsLoaded(Session session, List expect checkArgument(!catalog.getCatalogHandle().equals(GlobalSystemConnector.CATALOG_HANDLE), "Global system catalog not registered"); CatalogConnector newCatalog = catalogFactory.createCatalog(catalog); catalogs.put(catalog.getCatalogHandle(), newCatalog); - log.info("Added catalog: " + catalog.getCatalogHandle()); + log.debug("Added catalog: " + catalog.getCatalogHandle()); } } finally { @@ -142,7 +142,7 @@ public void pruneCatalogs(Set catalogsInUse) } if (!removedCatalogs.isEmpty()) { List sortedHandles = removedCatalogs.stream().map(connector -> connector.getCatalogHandle().toString()).sorted().toList(); - log.info("Pruned catalogs: %s", sortedHandles); + log.debug("Pruned catalogs: %s", sortedHandles); } } diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java b/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java index 685d8c2718b1..f4c8fe43ab19 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java @@ -78,7 +78,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; import static com.google.common.base.Preconditions.checkArgument; @@ -447,13 +447,14 @@ public VersionedDynamicFilterDomains acknowledgeAndGetNewDynamicFilterDomains(Ta return sqlTask.acknowledgeAndGetNewDynamicFilterDomains(currentDynamicFiltersVersion); } - private final ReentrantLock catalogsLock = new ReentrantLock(); + private final ReentrantReadWriteLock catalogsLock = new ReentrantReadWriteLock(); public void pruneCatalogs(Set activeCatalogs) { - catalogsLock.lock(); + Set catalogsInUse = new HashSet<>(activeCatalogs); + ReentrantReadWriteLock.WriteLock pruneLock = catalogsLock.writeLock(); + pruneLock.lock(); try { - Set catalogsInUse = new HashSet<>(activeCatalogs); for (SqlTask task : tasks.asMap().values()) { // add all catalogs being used by a non-done task if (!task.getTaskState().isDone()) { @@ -463,7 +464,7 @@ public void pruneCatalogs(Set activeCatalogs) connectorServicesProvider.pruneCatalogs(catalogsInUse); } finally { - catalogsLock.unlock(); + pruneLock.unlock(); } } @@ -533,7 +534,14 @@ private TaskInfo doUpdateTask( .map(CatalogProperties::getCatalogHandle) .collect(toImmutableSet()); if (sqlTask.setCatalogs(catalogHandles)) { - connectorServicesProvider.ensureCatalogsLoaded(session, activeCatalogs); + ReentrantReadWriteLock.ReadLock catalogInitLock = catalogsLock.readLock(); + catalogInitLock.lock(); + try { + connectorServicesProvider.ensureCatalogsLoaded(session, activeCatalogs); + } + finally { + catalogInitLock.unlock(); + } } }); diff --git a/core/trino-main/src/main/java/io/trino/metadata/CatalogManager.java b/core/trino-main/src/main/java/io/trino/metadata/CatalogManager.java index cf3538aa72cf..31f9628a29df 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/CatalogManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/CatalogManager.java @@ -44,6 +44,12 @@ public Optional getCatalogProperties(CatalogHandle catalogHan return Optional.empty(); } + @Override + public Set getActiveCatalogs() + { + return ImmutableSet.of(); + } + @Override public void createCatalog(String catalogName, ConnectorName connectorName, Map properties, boolean notExists) { @@ -63,6 +69,8 @@ public void dropCatalog(String catalogName, boolean exists) Optional getCatalogProperties(CatalogHandle catalogHandle); + Set getActiveCatalogs(); + void createCatalog(String catalogName, ConnectorName connectorName, Map properties, boolean notExists); void dropCatalog(String catalogName, boolean exists); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/PlanFragment.java b/core/trino-main/src/main/java/io/trino/sql/planner/PlanFragment.java index 4b51a8edff02..42db56e5812c 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/PlanFragment.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/PlanFragment.java @@ -356,4 +356,20 @@ public PlanFragment withRoot(PlanNode root) this.languageFunctions, this.jsonRepresentation); } + + public PlanFragment withActiveCatalogs(List activeCatalogs) + { + return new PlanFragment( + this.id, + this.root, + this.symbols, + this.partitioning, + this.partitionCount, + this.partitionedSources, + this.outputPartitioningScheme, + this.statsAndCosts, + activeCatalogs, + this.languageFunctions, + this.jsonRepresentation); + } } diff --git a/core/trino-main/src/test/java/io/trino/connector/TestingLocalCatalogPruneTask.java b/core/trino-main/src/test/java/io/trino/connector/TestingLocalCatalogPruneTask.java new file mode 100644 index 000000000000..ec48bd497d4e --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/connector/TestingLocalCatalogPruneTask.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.connector; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.discovery.client.ServiceDescriptor; +import io.airlift.discovery.client.ServiceSelector; +import io.airlift.http.client.testing.TestingHttpClient; +import io.airlift.node.NodeInfo; +import io.trino.execution.SqlTaskManager; +import io.trino.metadata.CatalogManager; +import io.trino.server.InternalCommunicationConfig; +import io.trino.spi.connector.CatalogHandle; +import io.trino.transaction.TransactionManager; + +import java.util.List; +import java.util.Set; + +import static com.google.common.util.concurrent.Futures.immediateFuture; +import static java.util.Objects.requireNonNull; + +public class TestingLocalCatalogPruneTask + extends CatalogPruneTask +{ + private final SqlTaskManager sqlTaskManagerToPrune; + + public TestingLocalCatalogPruneTask( + TransactionManager transactionManager, + CatalogManager catalogManager, + ConnectorServicesProvider connectorServicesProvider, + NodeInfo nodeInfo, + CatalogPruneTaskConfig catalogPruneTaskConfig, + SqlTaskManager sqlTaskManagerToPrune) + { + super( + transactionManager, + catalogManager, + connectorServicesProvider, + nodeInfo, + new ServiceSelector() { + @Override + public String getType() + { + throw new UnsupportedOperationException("No services to select"); + } + + @Override + public String getPool() + { + throw new UnsupportedOperationException("No pool"); + } + + @Override + public List selectAllServices() + { + return ImmutableList.of(); + } + + @Override + public ListenableFuture> refresh() + { + return immediateFuture(ImmutableList.of()); + } + }, + new TestingHttpClient(request -> { + throw new UnsupportedOperationException("Testing Locl Catalog Prune Task does not make http calls"); + }), + catalogPruneTaskConfig, + new InternalCommunicationConfig()); + this.sqlTaskManagerToPrune = requireNonNull(sqlTaskManagerToPrune, "sqlTaskManagerToPrune is null"); + } + + @Override + void pruneWorkerCatalogs(Set online, List activeCatalogs) + { + sqlTaskManagerToPrune.pruneCatalogs(ImmutableSet.copyOf(activeCatalogs)); + } +} diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManagerRaceWithCatalogPrune.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManagerRaceWithCatalogPrune.java new file mode 100644 index 000000000000..815c8dd85a8d --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManagerRaceWithCatalogPrune.java @@ -0,0 +1,290 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.node.NodeInfo; +import io.airlift.stats.TestingGcMonitor; +import io.airlift.tracing.Tracing; +import io.airlift.units.Duration; +import io.opentelemetry.api.trace.Span; +import io.trino.Session; +import io.trino.connector.CatalogConnector; +import io.trino.connector.CatalogFactory; +import io.trino.connector.CatalogProperties; +import io.trino.connector.CatalogPruneTask; +import io.trino.connector.CatalogPruneTaskConfig; +import io.trino.connector.ConnectorName; +import io.trino.connector.ConnectorServices; +import io.trino.connector.ConnectorServicesProvider; +import io.trino.connector.MockConnectorFactory; +import io.trino.connector.TestingLocalCatalogPruneTask; +import io.trino.connector.WorkerDynamicCatalogManager; +import io.trino.exchange.ExchangeManagerRegistry; +import io.trino.execution.buffer.PipelinedOutputBuffers; +import io.trino.execution.executor.RunningSplitInfo; +import io.trino.execution.executor.TaskExecutor; +import io.trino.execution.executor.TaskHandle; +import io.trino.memory.LocalMemoryManager; +import io.trino.memory.NodeMemoryConfig; +import io.trino.metadata.WorkerLanguageFunctionProvider; +import io.trino.spi.connector.CatalogHandle; +import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorFactory; +import io.trino.spiller.LocalSpillManager; +import io.trino.spiller.NodeSpillConfig; +import io.trino.sql.planner.PlanFragment; +import io.trino.testing.TestingConnectorContext; +import io.trino.testing.TestingSession; +import io.trino.transaction.NoOpTransactionManager; +import io.trino.transaction.TransactionInfo; +import io.trino.version.EmbedVersion; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.parallel.Execution; + +import java.util.List; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.DoubleSupplier; +import java.util.function.Function; +import java.util.function.Predicate; + +import static io.airlift.tracing.Tracing.noopTracer; +import static io.trino.execution.BaseTestSqlTaskManager.OUT; +import static io.trino.execution.TaskTestUtils.PLAN_FRAGMENT; +import static io.trino.execution.TaskTestUtils.TABLE_SCAN_NODE_ID; +import static io.trino.execution.TaskTestUtils.createTestSplitMonitor; +import static io.trino.execution.TaskTestUtils.createTestingPlanner; +import static io.trino.execution.buffer.PipelinedOutputBuffers.BufferType.PARTITIONED; +import static io.trino.metadata.CatalogManager.NO_CATALOGS; +import static io.trino.spi.connector.CatalogHandle.createRootCatalogHandle; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; +import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; + +@TestInstance(PER_CLASS) +@Execution(SAME_THREAD) +public class TestSqlTaskManagerRaceWithCatalogPrune +{ + private static final int NUM_TASKS = 20000; + private static final ConnectorServicesProvider NOOP_CONNECTOR_SERVICES_PROVIDER = new ConnectorServicesProvider() + { + @Override + public void loadInitialCatalogs() {} + + @Override + public void ensureCatalogsLoaded(Session session, List catalogs) {} + + @Override + public void pruneCatalogs(Set catalogsInUse) {} + + @Override + public ConnectorServices getConnectorServices(CatalogHandle catalogHandle) + { + return null; + } + }; + private static final CatalogFactory MOCK_CATALOG_FACTORY = new CatalogFactory() + { + @Override + public void addConnectorFactory(ConnectorFactory connectorFactory, Function duplicatePluginClassLoaderFactory) {} + + @Override + public CatalogConnector createCatalog(CatalogProperties catalogProperties) + { + Connector connector = MockConnectorFactory.create().create(catalogProperties.getCatalogHandle().getCatalogName(), catalogProperties.getProperties(), new TestingConnectorContext()); + ConnectorServices noOpConnectorService = new ConnectorServices( + Tracing.noopTracer(), + catalogProperties.getCatalogHandle(), + connector, + () -> {}); + return new CatalogConnector( + catalogProperties.getCatalogHandle(), + new ConnectorName("mock"), + noOpConnectorService, + noOpConnectorService, + noOpConnectorService, + Optional.of(catalogProperties)); + } + + @Override + public CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorName connectorName, Connector connector) + { + throw new UnsupportedOperationException("Only implement what is needed by worker catalog manager"); + } + }; + private static final TaskExecutor NOOP_TASK_EXECUTOR = new TaskExecutor() { + @Override + public TaskHandle addTask(TaskId taskId, DoubleSupplier utilizationSupplier, int initialSplitConcurrency, Duration splitConcurrencyAdjustFrequency, OptionalInt maxDriversPerTask) + { + return new TaskHandle() { + @Override + public boolean isDestroyed() + { + return false; + } + }; + } + + @Override + public void removeTask(TaskHandle taskHandle) {} + + @Override + public List> enqueueSplits(TaskHandle taskHandle, boolean intermediate, List taskSplits) + { + return ImmutableList.of(); + } + + @Override + public Set getStuckSplitTaskIds(Duration processingDurationThreshold, Predicate filter) + { + return ImmutableSet.of(); + } + + @Override + public void start() {} + + @Override + public void stop() {} + }; + private final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 10, TimeUnit.MINUTES, new LinkedBlockingDeque<>()); + private final AtomicInteger sequence = new AtomicInteger(1); + + @AfterAll + public void cleanup() + { + threadPoolExecutor.shutdown(); + } + + @Test + public void testMultipleTaskUpdatesWithMultipleCatalogPrunes() + { + ConnectorServicesProvider workerConnectorServiceProvider = new WorkerDynamicCatalogManager(MOCK_CATALOG_FACTORY); + SqlTaskManager workerTaskManager = getWorkerTaskManagerWithConnectorServiceProvider(workerConnectorServiceProvider); + + CatalogPruneTask catalogPruneTask = new TestingLocalCatalogPruneTask( + new NoInfoTransactionManager(), + NO_CATALOGS, + NOOP_CONNECTOR_SERVICES_PROVIDER, + new NodeInfo("testversion"), + new CatalogPruneTaskConfig(), + workerTaskManager); + + Future catalogTaskFuture = Futures.submit(() -> + { + for (int i = 0; i < NUM_TASKS; i++) { + String catalogName = "catalog_" + i; + CatalogHandle catalogHandle = createRootCatalogHandle(catalogName, new CatalogHandle.CatalogVersion(UUID.randomUUID().toString())); + TaskId taskId = newTaskId(); + workerTaskManager.updateTask( + TestingSession.testSession(), + taskId, + Span.getInvalid(), + Optional.of(fragmentWithCatalog(catalogHandle)), + ImmutableList.of(new SplitAssignment(TABLE_SCAN_NODE_ID, ImmutableSet.of(), true)), + PipelinedOutputBuffers.createInitial(PARTITIONED).withBuffer(OUT, 0).withNoMoreBufferIds(), + ImmutableMap.of(), + false); + try { + Thread.sleep(0, ThreadLocalRandom.current().nextInt(25, 75)); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertDoesNotThrow(() -> workerConnectorServiceProvider.getConnectorServices(catalogHandle)); + workerTaskManager.cancelTask(taskId); + if ((i & 63) == 0) { + workerTaskManager.removeOldTasks(); + } + } + }, threadPoolExecutor); + + Future pruneCatalogsFuture = Futures.submit(() -> + { + for (int i = 0; i < NUM_TASKS; i++) { + catalogPruneTask.pruneWorkerCatalogs(); + try { + Thread.sleep(0, ThreadLocalRandom.current().nextInt(25, 75)); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }, threadPoolExecutor); + + assertDoesNotThrow(() -> catalogTaskFuture.get(2, TimeUnit.MINUTES)); + assertDoesNotThrow(() -> pruneCatalogsFuture.get(2, TimeUnit.MINUTES)); + } + + private TaskId newTaskId() + { + return new TaskId(new StageId("query" + sequence.incrementAndGet(), 0), 1, 0); + } + + private static SqlTaskManager getWorkerTaskManagerWithConnectorServiceProvider(ConnectorServicesProvider workerConnectorServiceProvider) + { + return new SqlTaskManager( + new EmbedVersion("testversion"), + workerConnectorServiceProvider, + createTestingPlanner(), + new WorkerLanguageFunctionProvider(), + new BaseTestSqlTaskManager.MockLocationFactory(), + NOOP_TASK_EXECUTOR, + createTestSplitMonitor(), + new NodeInfo("testversion"), + new LocalMemoryManager(new NodeMemoryConfig()), + new TaskManagementExecutor(), + new TaskManagerConfig().setInfoMaxAge(Duration.ZERO), + new NodeMemoryConfig(), + new LocalSpillManager(new NodeSpillConfig()), + new NodeSpillConfig(), + new TestingGcMonitor(), + noopTracer(), + new ExchangeManagerRegistry(), + ignore -> true); + } + + private static PlanFragment fragmentWithCatalog(CatalogHandle catalogHandle) + { + return PLAN_FRAGMENT.withActiveCatalogs(ImmutableList.of( + new CatalogProperties( + catalogHandle, + new ConnectorName("mock"), + ImmutableMap.of()))); + } + + private static class NoInfoTransactionManager + extends NoOpTransactionManager + { + @Override + public List getAllTransactionInfos() + { + return ImmutableList.of(); + } + } +}