Implement chunked fetch streaming with circuit breaker integration#139124
Implement chunked fetch streaming with circuit breaker integration#139124drempapis merged 447 commits intoelastic:mainfrom
Conversation
server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/elasticsearch/search/fetch/chunk/TransportFetchPhaseCoordinationAction.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/elasticsearch/search/fetch/chunk/TransportFetchPhaseCoordinationAction.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java
Outdated
Show resolved
Hide resolved
|
@elasticmachine run elasticsearch-ci/part-2 |
DaveCTurner
left a comment
There was a problem hiding this comment.
Couple of thoughts about blocking of threads.
server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/search/fetch/FetchPhaseDocsIterator.java
Outdated
Show resolved
Hide resolved
|
@elasticmachine run elasticsearch-ci/part-1 |
|
@elasticmachine run elasticsearch-ci/part-2 |
…rch into chunked_fetch_phase
…rch into chunked_fetch_phase
| return false; | ||
| } | ||
|
|
||
| nextChunk = queue.poll(100, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Ok this is definitely better because at least it's only blocking a thread while fetching the docs locally, but now we need two threads.
I'm partly to blame for suggesting a ThrottledIterator here. That would have worked if we could have moved the fetch-from-disk process between threads but it doesn't fit here given the single-threadedness constraint. I think instead we need a new ThrottledTaskRunner("fetch", maxInFlightChunks, EsExecutors.DIRECT_EXECUTOR_SERVICE) to manage the queue.
There was a problem hiding this comment.
@DaveCTurner thank you for the feedback!
I want to make sure I understand correctly. When you say use ThrottledTaskRunner with DIRECT_EXECUTOR_SERVICE, do you mean
- Eliminate the producer-consumer pattern entirely and have the Lucene thread enqueue send tasks directly to ThrottledTaskRunner, which runs them inline when under capacity
- Keep the producer-consumer pattern, but replace
ThrottledIteratorwithThrottledTaskRunneron the consumer
There was a problem hiding this comment.
I've updated the implementation to use ThrottledTaskRunner
The iterateAsync method now uses a single ThrottledTaskRunner("fetch", maxInFlightChunks, DIRECT_EXECUTOR_SERVICE) to manage chunk sends.
- The calling thread fetches documents sequentially, serializes hits into chunks, and enqueues send tasks directly to the ThrottledTaskRunner
- Tasks run immediately on the calling thread via DIRECT_EXECUTOR_SERVICE when fewer than maxInFlightChunks are in flight
- When at the limit, tasks queue internally until ACK callbacks signal completion, which triggers queued tasks
This is better than the custom-made producer/Consumer implementation. Νo thread blocks waiting for network I/O, and the producer thread is freed immediately after enqueueing, while memory usage is throttled by the CB on the data nodes to protect from OOM
|
@elasticmachine run elasticsearch-ci/part-2 |
|
Buildkite benchmark this with noaa-3n-2g please |
|
Buildkite benchmark this with esql please |
|
Buildkite benchmark this with geoshape please |
💔 Build Failed
Failed CI StepsThis build attempts two geoshape benchmarks to evaluate performance impact of this PR. To estimate benchmark completion time inspect previous nightly runs here. History
|
In the current implementation, when Elasticsearch executes a search query that returns a large number of documents, the fetch phase retrieves the actual document content from each shard, which can lead to significant memory pressure on data nodes.
This PR implements chunked streaming for the fetch phase to reduce memory pressure when handling large result sets. Instead of accumulating all search hits in memory on the data node before sending them to the coordinator, hits are streamed in configurable chunks (default: 256 KB) as they are produced. Memory usage is bounded by circuit breakers on both the data and coordinator nodes.
How OOM is Prevented on the Data NodeHow OOM is Prevented on the Coordinator NodeFlow Diagram
The implementation followed the paradigm of
TransportRepositoryVerifyIntegrityCoordinationActionbut it streams only between the coordinator and data-nodes.