Skip to content

[WIP] Implement exchange spooling#10376

Closed
linzebing wants to merge 14 commits intotrinodb:masterfrom
linzebing:exchange-spooling
Closed

[WIP] Implement exchange spooling#10376
linzebing wants to merge 14 commits intotrinodb:masterfrom
linzebing:exchange-spooling

Conversation

@linzebing
Copy link
Copy Markdown
Member

@linzebing linzebing commented Dec 21, 2021

This PR adds trino-exchange plugin to Trino, which contains a local file system implementation as well as a S3-based implementation.

@kokosing
Copy link
Copy Markdown
Member

Is "spooling" and "spilling" a different things? Do we reuse one for the other?

@losipiuk
Copy link
Copy Markdown
Member

Is "spooling" and "spilling" a different things? Do we reuse one for the other?

Different. Not very related. Spooling is a single word name for the implementation of persistent exchange buffers (concept added in task level retries PR) which dump data to external filesystem.

@linzebing
Copy link
Copy Markdown
Member Author

I realized that it might be better to implement parallel read illustrated using a producer-consumer model, instead of async read via double buffering. I will test both.

@losipiuk
Copy link
Copy Markdown
Member

Thanks. Looks good. I did not reread everything. I assume changes in the old commits were (mostly) addressing of comments. Please point me to a specific piece of code if you feel I should look at it. It would be nice if github exposed force-push timeline for PRs and allowed to get a diff including all the changes since I last reviewed that.
We had sth like that in my previous job and it worked great.

@arhimondr
Copy link
Copy Markdown
Contributor

Resolves #9936

@arhimondr
Copy link
Copy Markdown
Contributor

@linzebing

S3 minimum upload buffer size is 5MB. When running MinIO-based tests for suites like AbstractTestFaultTolerantExecutionAggregations, a few tests will throw out of memory errors because of running out of heap space.

Running out of heap space usually indicates a memory accounting problem.

Also the number of partitions configured for those tests is only 5: 9a5c5d7#diff-c3957ed97dd34166ebb4937df48e597924e5c1345b2668f807014f834154a4d9R28. A single task shouldn't allocate more than 5MB * 5 * 2 = 50MB of buffer space. The number of nodes in a cluster is set to 4. With fault tolerant execution enabled only a single task per query is run on a node. So in theory the buffer space for a single query shouldn't exceed ~200MB. With two queries running concurrently it get's to 400MB. In theory it could cause additional memory pressure, however somehow it feels not very likely. I would recommend taking a heapdump (tests are configured to generate heap dump automatically when running out of heap) to check if everything is properly accounted and there's no apparent leaks.

Also I found a different memory issue related to S3 (#10464) and I disabled streaming upload for MinIO based tests for now (b555618). I wonder if it could be anyhow related?

@linzebing
Copy link
Copy Markdown
Member Author

@arhimondr I will try to get a heap dump

@losipiuk
Copy link
Copy Markdown
Member

Also I found a different memory issue related to S3 (#10464) and I disabled streaming upload for MinIO based tests for now (b555618). I wonder if it could be anyhow related?

Oh - so this is the reason test I run for #10432 stopped failing with OO today after I rebased :D.


try {
List<CompletedPart> completedParts = uploadFutures.stream()
.map(CompletableFuture::join)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will cause the close method to block for potentially a long time (as it will have to wait for all parts to finish upload). Currently if you look at the call stack it is not designed to block for a very long time. The call to ExchangeSink#finish is done from the OutputBuffer#noMorePages that is executed from a rather tiny executor that can easily run out of threads.

This is actually my mistake. Now after carefully looking at the implementation of other output buffers it looks like ExchangeSink#finish and ExchangeSink#abort should be asynchronous and return a Future. I'm going to address this issue in the main PR.

}
}
try {
exchangeStorage.createEmptyFile(outputDirectory.resolve(COMMITTED_MARKER_FILE_NAME));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, I wonder if this should be non blocking

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

4 participants