Skip to content

Commit 968f9f3

Browse files
committed
Fix FS exchange manager thread leak
1 parent d0a716f commit 968f9f3

File tree

1 file changed

+17
-0
lines changed

1 file changed

+17
-0
lines changed

plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManager.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.google.common.collect.ImmutableList;
1717
import com.google.inject.Inject;
18+
import io.airlift.bootstrap.LifeCycleManager;
1819
import io.opentelemetry.api.trace.Tracer;
1920
import io.trino.spi.TrinoException;
2021
import io.trino.spi.exchange.Exchange;
@@ -23,6 +24,7 @@
2324
import io.trino.spi.exchange.ExchangeSink;
2425
import io.trino.spi.exchange.ExchangeSinkInstanceHandle;
2526
import io.trino.spi.exchange.ExchangeSource;
27+
import jakarta.annotation.PreDestroy;
2628

2729
import java.net.URI;
2830
import java.util.List;
@@ -40,6 +42,7 @@ public class FileSystemExchangeManager
4042
{
4143
public static final String PATH_SEPARATOR = "/";
4244

45+
private final LifeCycleManager lifeCycleManager;
4346
private final FileSystemExchangeStorage exchangeStorage;
4447
private final FileSystemExchangeStats stats;
4548
private final Tracer tracer;
@@ -57,11 +60,13 @@ public class FileSystemExchangeManager
5760

5861
@Inject
5962
public FileSystemExchangeManager(
63+
LifeCycleManager lifeCycleManager,
6064
FileSystemExchangeStorage exchangeStorage,
6165
FileSystemExchangeStats stats,
6266
FileSystemExchangeConfig fileSystemExchangeConfig,
6367
Tracer tracer)
6468
{
69+
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
6570
this.exchangeStorage = requireNonNull(exchangeStorage, "exchangeStorage is null");
6671
this.stats = requireNonNull(stats, "stats is null");
6772
this.baseDirectories = ImmutableList.copyOf(requireNonNull(fileSystemExchangeConfig.getBaseDirectories(), "baseDirectories is null"));
@@ -78,6 +83,12 @@ public FileSystemExchangeManager(
7883
this.executor = newCachedThreadPool(daemonThreadsNamed("exchange-source-handles-creation-%s"));
7984
}
8085

86+
@PreDestroy
87+
public void destroy()
88+
{
89+
executor.shutdownNow();
90+
}
91+
8192
@Override
8293
public Exchange createExchange(ExchangeContext context, int outputPartitionCount, boolean preserveOrderWithinPartition)
8394
{
@@ -132,4 +143,10 @@ public boolean supportsConcurrentReadAndWrite()
132143
{
133144
return false;
134145
}
146+
147+
@Override
148+
public void shutdown()
149+
{
150+
lifeCycleManager.stop();
151+
}
135152
}

0 commit comments

Comments
 (0)