Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.airlift.log.Logger;
import io.airlift.node.NodeInfo;
import io.airlift.units.Duration;
import io.trino.metadata.CatalogManager;
import io.trino.metadata.ForNodeManager;
import io.trino.server.InternalCommunicationConfig;
import io.trino.spi.connector.CatalogHandle;
Expand Down Expand Up @@ -61,7 +62,8 @@ public class CatalogPruneTask
private static final JsonCodec<List<CatalogHandle>> CATALOG_HANDLES_CODEC = listJsonCodec(CatalogHandle.class);

private final TransactionManager transactionManager;
private final CoordinatorDynamicCatalogManager catalogManager;
private final CatalogManager catalogManager;
private final ConnectorServicesProvider connectorServicesProvider;
private final NodeInfo nodeInfo;
private final ServiceSelector selector;
private final HttpClient httpClient;
Expand All @@ -78,7 +80,8 @@ public class CatalogPruneTask
@Inject
public CatalogPruneTask(
TransactionManager transactionManager,
CoordinatorDynamicCatalogManager catalogManager,
CatalogManager catalogManager,
ConnectorServicesProvider connectorServicesProvider,
NodeInfo nodeInfo,
@ServiceType("trino") ServiceSelector selector,
@ForNodeManager HttpClient httpClient,
Expand All @@ -87,6 +90,7 @@ public CatalogPruneTask(
{
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.catalogManager = requireNonNull(catalogManager, "catalogManager is null");
this.connectorServicesProvider = requireNonNull(connectorServicesProvider, "connectorServicesProvider is null");
this.nodeInfo = requireNonNull(nodeInfo, "nodeInfo is null");
this.selector = requireNonNull(selector, "selector is null");
this.httpClient = requireNonNull(httpClient, "httpClient is null");
Expand Down Expand Up @@ -127,14 +131,22 @@ public ThreadPoolExecutorMBean getExecutor()
}

@VisibleForTesting
void pruneWorkerCatalogs()
public void pruneWorkerCatalogs()
{
Set<ServiceDescriptor> online = selector.selectAllServices().stream()
.filter(descriptor -> !nodeInfo.getNodeId().equals(descriptor.getNodeId()))
.collect(toImmutableSet());

// send message to workers to trigger prune
List<CatalogHandle> activeCatalogs = getActiveCatalogs();
pruneWorkerCatalogs(online, activeCatalogs);

// prune all inactive catalogs - we pass an empty set here because manager always retains active catalogs
connectorServicesProvider.pruneCatalogs(ImmutableSet.of());
}

void pruneWorkerCatalogs(Set<ServiceDescriptor> online, List<CatalogHandle> activeCatalogs)
{
for (ServiceDescriptor service : online) {
URI uri = getHttpUri(service);
if (uri == null) {
Expand Down Expand Up @@ -163,9 +175,6 @@ public Object handle(Request request, Response response)
}
});
}

// prune all inactive catalogs - we pass an empty set here because manager always retains active catalogs
catalogManager.pruneCatalogs(ImmutableSet.of());
}

private List<CatalogHandle> getActiveCatalogs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public Optional<Catalog> getCatalog(String catalogName)
return Optional.ofNullable(activeCatalogs.get(catalogName));
}

@Override
public Set<CatalogHandle> getActiveCatalogs()
{
return activeCatalogs.values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@ public Optional<CatalogProperties> getCatalogProperties(CatalogHandle catalogHan
return Optional.empty();
}

@Override
public Set<CatalogHandle> getActiveCatalogs()
{
// static catalog manager does not differentiate between active and not. Nor does it need to prune
return ImmutableSet.of();
}

@Override
public ConnectorServices getConnectorServices(CatalogHandle catalogHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void ensureCatalogsLoaded(Session session, List<CatalogProperties> expect
checkArgument(!catalog.getCatalogHandle().equals(GlobalSystemConnector.CATALOG_HANDLE), "Global system catalog not registered");
CatalogConnector newCatalog = catalogFactory.createCatalog(catalog);
catalogs.put(catalog.getCatalogHandle(), newCatalog);
log.info("Added catalog: " + catalog.getCatalogHandle());
log.debug("Added catalog: " + catalog.getCatalogHandle());
}
}
finally {
Expand Down Expand Up @@ -142,7 +142,7 @@ public void pruneCatalogs(Set<CatalogHandle> catalogsInUse)
}
if (!removedCatalogs.isEmpty()) {
List<String> sortedHandles = removedCatalogs.stream().map(connector -> connector.getCatalogHandle().toString()).sorted().toList();
log.info("Pruned catalogs: %s", sortedHandles);
log.debug("Pruned catalogs: %s", sortedHandles);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -447,13 +447,14 @@ public VersionedDynamicFilterDomains acknowledgeAndGetNewDynamicFilterDomains(Ta
return sqlTask.acknowledgeAndGetNewDynamicFilterDomains(currentDynamicFiltersVersion);
}

private final ReentrantLock catalogsLock = new ReentrantLock();
private final ReentrantReadWriteLock catalogsLock = new ReentrantReadWriteLock();

public void pruneCatalogs(Set<CatalogHandle> activeCatalogs)
{
catalogsLock.lock();
Set<CatalogHandle> catalogsInUse = new HashSet<>(activeCatalogs);
ReentrantReadWriteLock.WriteLock pruneLock = catalogsLock.writeLock();
pruneLock.lock();
try {
Set<CatalogHandle> catalogsInUse = new HashSet<>(activeCatalogs);
for (SqlTask task : tasks.asMap().values()) {
// add all catalogs being used by a non-done task
if (!task.getTaskState().isDone()) {
Expand All @@ -463,7 +464,7 @@ public void pruneCatalogs(Set<CatalogHandle> activeCatalogs)
connectorServicesProvider.pruneCatalogs(catalogsInUse);
}
finally {
catalogsLock.unlock();
pruneLock.unlock();
}
}

Expand Down Expand Up @@ -533,7 +534,14 @@ private TaskInfo doUpdateTask(
.map(CatalogProperties::getCatalogHandle)
.collect(toImmutableSet());
if (sqlTask.setCatalogs(catalogHandles)) {
connectorServicesProvider.ensureCatalogsLoaded(session, activeCatalogs);
ReentrantReadWriteLock.ReadLock catalogInitLock = catalogsLock.readLock();
catalogInitLock.lock();
try {
connectorServicesProvider.ensureCatalogsLoaded(session, activeCatalogs);
}
finally {
catalogInitLock.unlock();
}
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public Optional<CatalogProperties> getCatalogProperties(CatalogHandle catalogHan
return Optional.empty();
}

@Override
public Set<CatalogHandle> getActiveCatalogs()
{
return ImmutableSet.of();
}

@Override
public void createCatalog(String catalogName, ConnectorName connectorName, Map<String, String> properties, boolean notExists)
{
Expand All @@ -63,6 +69,8 @@ public void dropCatalog(String catalogName, boolean exists)

Optional<CatalogProperties> getCatalogProperties(CatalogHandle catalogHandle);

Set<CatalogHandle> getActiveCatalogs();

void createCatalog(String catalogName, ConnectorName connectorName, Map<String, String> properties, boolean notExists);

void dropCatalog(String catalogName, boolean exists);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,4 +356,20 @@ public PlanFragment withRoot(PlanNode root)
this.languageFunctions,
this.jsonRepresentation);
}

public PlanFragment withActiveCatalogs(List<CatalogProperties> activeCatalogs)
{
return new PlanFragment(
this.id,
this.root,
this.symbols,
this.partitioning,
this.partitionCount,
this.partitionedSources,
this.outputPartitioningScheme,
this.statsAndCosts,
activeCatalogs,
this.languageFunctions,
this.jsonRepresentation);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.connector;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.discovery.client.ServiceDescriptor;
import io.airlift.discovery.client.ServiceSelector;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.node.NodeInfo;
import io.trino.execution.SqlTaskManager;
import io.trino.metadata.CatalogManager;
import io.trino.server.InternalCommunicationConfig;
import io.trino.spi.connector.CatalogHandle;
import io.trino.transaction.TransactionManager;

import java.util.List;
import java.util.Set;

import static com.google.common.util.concurrent.Futures.immediateFuture;
import static java.util.Objects.requireNonNull;

public class TestingLocalCatalogPruneTask
extends CatalogPruneTask
{
private final SqlTaskManager sqlTaskManagerToPrune;

public TestingLocalCatalogPruneTask(
TransactionManager transactionManager,
CatalogManager catalogManager,
ConnectorServicesProvider connectorServicesProvider,
NodeInfo nodeInfo,
CatalogPruneTaskConfig catalogPruneTaskConfig,
SqlTaskManager sqlTaskManagerToPrune)
{
super(
transactionManager,
catalogManager,
connectorServicesProvider,
nodeInfo,
new ServiceSelector() {
@Override
public String getType()
{
throw new UnsupportedOperationException("No services to select");
}

@Override
public String getPool()
{
throw new UnsupportedOperationException("No pool");
}

@Override
public List<ServiceDescriptor> selectAllServices()
{
return ImmutableList.of();
}

@Override
public ListenableFuture<List<ServiceDescriptor>> refresh()
{
return immediateFuture(ImmutableList.of());
}
},
new TestingHttpClient(request -> {
throw new UnsupportedOperationException("Testing Locl Catalog Prune Task does not make http calls");
}),
catalogPruneTaskConfig,
new InternalCommunicationConfig());
this.sqlTaskManagerToPrune = requireNonNull(sqlTaskManagerToPrune, "sqlTaskManagerToPrune is null");
}

@Override
void pruneWorkerCatalogs(Set<ServiceDescriptor> online, List<CatalogHandle> activeCatalogs)
{
sqlTaskManagerToPrune.pruneCatalogs(ImmutableSet.copyOf(activeCatalogs));
}
}
Loading