Support exchange spooling on GCS#12360
Conversation
...e-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeModule.java
Outdated
Show resolved
Hide resolved
...system/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
...system/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
...system/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
|
This will need docs .. please work with @colebow on adding this. |
...system/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
...system/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
...system/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
...system/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
...system/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
e02cbc1 to
75ff0a5
Compare
...system/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
This can in theory grow indefinitely. In FileSystemExchange#close we call deleteRecursively in a loop for each task. This may result in huge spikes in number of threads (hundreds or even thousands). I would recommend going with a bounded executor with the number of threads set to desired concurrency, e.g.:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
maximumConcurrency,
maximumConcurrency,
10, SECONDS,
new LinkedBlockingQueue<>(),
threadsNamed("gcs-delete-%s"));
I don't know what value do we want to pick for max concurrency though, maybe 50? 100?
Also it is a generally good idea to set executor.allowCoreThreadTimeOut(true); to let the inactive threads be reclaimed after a spike.
It might also be reasonable to improve FileSystemExchange#close to batch delete requests across multiple partitions.
There was a problem hiding this comment.
Yeah, I think it's better to batch delete requests. Changing it right now.
With batching, I think directly using cachedExecutor will be sufficient.
...system/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
|
I have changed to batch the batch deletes, such to minimize API calls to GCS. Wonder if we should do the same for Azure and S3. Currently we are deleting a task output directory at a time. In theory, we can do something similar to GCS, collect all the objects into a list, and batch delete them. @arhimondr @losipiuk |
Would make sense IMO. Good catch. |
...system/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
...system/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
...e-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeModule.java
Outdated
Show resolved
Hide resolved
...system/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java
Outdated
Show resolved
Hide resolved
|
Addressed comments. On batching deletes for S3 and Azure, decided to do it in a separate PR. It's a bit more complex than I thought as I need to deal with multiple buckets |
There was a problem hiding this comment.
nit: one parameter per line, static import listeningDecorator
There was a problem hiding this comment.
Also set the core pool size to 100, otherwise it will keep running only a single thread until the queue is full
There was a problem hiding this comment.
Ah you are right. I'm using a SynchronousQueue here, which basically has a size of 0, and if concurrent tasks exceed 100, rejection will happen. Your suggestion above is better.
Description
New feature.
trino-exchange-filesystem
This PR adds support for exchange spooling on GCS. GCS is mostly S3-compatible, except for two minor incompatibilities.
An example
exchange-manager.properties:Related issues, pull requests, and links
Documentation
( ) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
(x) Documentation issue #issuenumber is filed, and can be handled later.
#12467
Release notes
( ) No release notes entries required.
(x) Release notes entries required with the following suggested text: