Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.connector;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.ThreadSafe;
Expand All @@ -26,6 +25,7 @@
import io.trino.spi.catalog.CatalogProperties;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.ConnectorName;
import io.trino.util.AutoCloseableCloser;
import jakarta.annotation.PreDestroy;

import java.util.ArrayList;
Expand All @@ -39,15 +39,16 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;

@ThreadSafe
public class WorkerDynamicCatalogManager
Expand All @@ -61,9 +62,9 @@ public class WorkerDynamicCatalogManager
private final Lock catalogLoadingLock = catalogsLock.readLock();
private final Lock catalogRemovingLock = catalogsLock.writeLock();
private final ConcurrentMap<CatalogHandle, CatalogConnector> catalogs = new ConcurrentHashMap<>();
private final ExecutorService executor = Executors.newCachedThreadPool();
private final ExecutorService executor = newCachedThreadPool(daemonThreadsNamed("worker-dynamic-catalog-manager-%s"));

@GuardedBy("catalogsUpdateLock")
@GuardedBy("catalogsLock")
private boolean stopped;

@Inject
Expand All @@ -74,25 +75,24 @@ public WorkerDynamicCatalogManager(CatalogFactory catalogFactory)

@PreDestroy
public void stop()
throws Exception
{
List<CatalogConnector> catalogs;
try (AutoCloseableCloser closer = AutoCloseableCloser.create()) {
catalogRemovingLock.lock();
try {
if (stopped) {
return;
}
stopped = true;

catalogRemovingLock.lock();
try {
if (stopped) {
return;
catalogs.values().forEach(catalog -> closer.register(catalog::shutdown));
catalogs.clear();
}
finally {
catalogRemovingLock.unlock();
}
stopped = true;

catalogs = ImmutableList.copyOf(this.catalogs.values());
this.catalogs.clear();
}
finally {
catalogRemovingLock.unlock();
}

for (CatalogConnector connector : catalogs) {
connector.shutdown();
closer.register(executor::shutdownNow);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand All @@ -71,6 +70,7 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.MoreCollectors.toOptional;
import static com.google.common.collect.Sets.difference;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.units.DataSize.succinctBytes;
import static io.airlift.units.Duration.nanosSince;
import static io.trino.ExceededMemoryLimitException.exceededGlobalTotalLimit;
Expand All @@ -86,13 +86,14 @@
import static java.lang.Math.min;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newSingleThreadExecutor;

public class ClusterMemoryManager
{
private static final Logger log = Logger.get(ClusterMemoryManager.class);
private static final String EXPORTED_POOL_NAME = "general";

private final ExecutorService listenerExecutor = Executors.newSingleThreadExecutor();
private final ExecutorService listenerExecutor = newSingleThreadExecutor(daemonThreadsNamed("cluster-memory-manager-listener-%s"));
private final ClusterMemoryLeakDetector memoryLeakDetector = new ClusterMemoryLeakDetector();
private final InternalNodeManager nodeManager;
private final LocationFactory locationFactory;
Expand Down