Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
bb7e126
initial commit
drempapis Feb 24, 2026
85ffe67
Merge remote-tracking branch 'origin/main' into fix/release-breaker-a…
drempapis Feb 24, 2026
80991f2
update after review
drempapis Feb 24, 2026
4348a8d
update code
drempapis Feb 26, 2026
843fc64
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Feb 26, 2026
3f6f581
[CI] Auto commit changes from spotless
Feb 26, 2026
6574cc6
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 3, 2026
4184b9a
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 3, 2026
7a6b29e
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 3, 2026
e8f0e02
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 4, 2026
b85733b
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 5, 2026
5a37cde
update codee
drempapis Mar 5, 2026
a26a1ec
update after review
drempapis Mar 5, 2026
9487e49
update after review
drempapis Mar 5, 2026
69d1e54
update after review
drempapis Mar 5, 2026
1ae96d2
update after review
drempapis Mar 5, 2026
5444e46
update after review
drempapis Mar 5, 2026
ea8a10d
udpate
drempapis Mar 5, 2026
2fc3ee0
update code
drempapis Mar 5, 2026
dc07e82
update
drempapis Mar 5, 2026
9b700a8
update
drempapis Mar 5, 2026
1b6361f
update after review
drempapis Mar 6, 2026
2a1ef9a
update after review
drempapis Mar 6, 2026
d89663e
update after review
drempapis Mar 6, 2026
b16d813
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 6, 2026
4f5b108
[CI] Auto commit changes from spotless
Mar 6, 2026
8773f77
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 6, 2026
876f3e8
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 6, 2026
832b08c
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 6, 2026
2205ab4
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 9, 2026
3ce47fa
update after review
drempapis Mar 9, 2026
f9d7e43
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 9, 2026
320cedf
[CI] Auto commit changes from spotless
Mar 9, 2026
b75717d
update after review
drempapis Mar 9, 2026
c15da98
update after review
drempapis Mar 9, 2026
a027e21
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 9, 2026
47082a4
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 9, 2026
b8da157
update after review
drempapis Mar 9, 2026
4e8a993
Merge branch 'fix/release-breaker-after-send-response' of github.com:…
drempapis Mar 9, 2026
0507aaf
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 9, 2026
391165c
Update newNetworkBytesStream method signature
drempapis Mar 9, 2026
b825df8
Fix mock behavior for newNetworkBytesStream method
drempapis Mar 9, 2026
a74f69d
Fix merge conflict in TransportService.java
drempapis Mar 9, 2026
7797454
Remove default implementation for newNetworkBytesStream
drempapis Mar 9, 2026
2a4f7c9
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 9, 2026
10cf010
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 9, 2026
00767dd
[CI] Auto commit changes from spotless
Mar 9, 2026
cf59664
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 9, 2026
929532e
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 17, 2026
e8023db
update after review
drempapis Mar 17, 2026
506a922
update after review
drempapis Mar 17, 2026
ff2a4c9
update after review
drempapis Mar 17, 2026
8e78ca8
update after review
drempapis Mar 17, 2026
94a89c5
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 17, 2026
cf6c215
update after review
drempapis Mar 18, 2026
c7a08d1
[CI] Auto commit changes from spotless
Mar 18, 2026
3682c19
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 18, 2026
e2233ce
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 18, 2026
97e14de
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 18, 2026
3f41823
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 18, 2026
ac496e3
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 18, 2026
a63d189
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 19, 2026
1cd903c
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 19, 2026
82badc8
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 19, 2026
3fade55
update naming and small fixes after review
drempapis Mar 19, 2026
a89f2d2
update after review
drempapis Mar 19, 2026
27a750d
update after review
drempapis Mar 19, 2026
3be8c42
update after review
drempapis Mar 19, 2026
cc26220
update class name
drempapis Mar 19, 2026
1fcac86
Merge branch 'main' into fix/release-breaker-after-send-response
drempapis Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
Expand All @@ -23,6 +24,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -503,7 +505,10 @@ public void onFailure(Exception e) {
Exception.class,
client.prepareSearch("test").addAggregation(new TestAggregationBuilder("test"))
);
assertThat(exc.getCause().getMessage(), containsString("<reduce_aggs>"));
assertNotNull(
"root cause must be a CircuitBreakingException",
ExceptionsHelper.unwrap(exc, CircuitBreakingException.class)
);
});

final AtomicArray<Exception> exceptions = new AtomicArray<>(10);
Expand All @@ -530,7 +535,10 @@ public void onFailure(Exception exc) {
latch.await();
assertThat(exceptions.asList().size(), equalTo(10));
for (Exception exc : exceptions.asList()) {
assertThat(exc.getCause().getMessage(), containsString("<reduce_aggs>"));
assertNotNull(
"root cause must be a CircuitBreakingException",
ExceptionsHelper.unwrap(exc, CircuitBreakingException.class)
);
}
assertBusy(() -> assertThat(requestBreakerUsed(), equalTo(0L)));
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionResponse;
Expand All @@ -21,6 +22,8 @@
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -50,9 +53,12 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.transport.BytesTransportResponse;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TaskTransportChannel;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
Expand Down Expand Up @@ -457,7 +463,7 @@ public static void registerRequestHandler(
(request, channel, task) -> searchService.executeQueryPhase(
request,
(SearchShardTask) task,
new ChannelActionListener<>(channel)
channelListener(transportService, channel, searchService.getCircuitBreaker())
)
);
TransportActionProxy.registerProxyActionWithDynamicResponseType(
Expand Down Expand Up @@ -513,7 +519,7 @@ public static void registerRequestHandler(
(request, channel, task) -> searchService.executeFetchPhase(
request,
(SearchShardTask) task,
new ChannelActionListener<>(channel)
channelListener(transportService, channel, searchService.getCircuitBreaker())
)
);
TransportActionProxy.registerProxyAction(
Expand Down Expand Up @@ -541,7 +547,11 @@ public static void registerRequestHandler(
);

final TransportRequestHandler<ShardFetchRequest> shardFetchRequestHandler = (request, channel, task) -> searchService
.executeFetchPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel));
.executeFetchPhase(
request,
(SearchShardTask) task,
channelListener(transportService, channel, searchService.getCircuitBreaker())
);
transportService.registerRequestHandler(
FETCH_ID_SCROLL_ACTION_NAME,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
Expand Down Expand Up @@ -672,6 +682,75 @@ private boolean assertNodePresent() {
}
}

/**
* Returns a listener that serializes responses to bytes on the network path.
*
* <p>On the <b>network path</b>, the response is serialized into bytes using a
* circuit-breaker-aware stream and sent as a {@link BytesTransportResponse}.
*
* <p>On the <b>direct (same-node) path</b> the response is forwarded as-is.
*
* <p>Circuit-breaker accounting for response objects is handled by the caller.
*/
static <T extends TransportResponse> ActionListener<T> channelListener(
TransportService transportService,
TransportChannel channel,
@Nullable CircuitBreaker circuitBreaker
) {
if (isDirectResponseChannel(channel)) {
return new ChannelActionListener<>(channel);
}
return new NetworkPathListener<>(transportService, channel, circuitBreaker);
}

private static boolean isDirectResponseChannel(TransportChannel channel) {
if (channel instanceof TaskTransportChannel ttc) {
channel = ttc.getChannel();
}
return TransportService.isDirectResponseChannel(channel);
}

/**
* Serializes the response into a {@link BytesTransportResponse} while keeping the breaker-accounted
* bytes alive for the response lifecycle. Captures the transport version from the channel at
* construction time and reuses it for serialization and the response metadata.
*/
private static class NetworkPathListener<T extends TransportResponse> implements ActionListener<T> {
private final TransportService transportService;
private final TransportVersion transportVersion;
private final ChannelActionListener<BytesTransportResponse> channelListener;
@Nullable
private final CircuitBreaker circuitBreaker;

NetworkPathListener(TransportService transportService, TransportChannel channel, @Nullable CircuitBreaker circuitBreaker) {
this.transportService = transportService;
this.transportVersion = channel.getVersion();
this.channelListener = new ChannelActionListener<>(channel);
this.circuitBreaker = circuitBreaker;
}

@Override
public void onResponse(T response) {
// The bytes reference keeps breaker-accounted bytes; the stream output closes after serialization.
final ReleasableBytesReference bytesRef;
try (var out = transportService.newNetworkBytesStream(circuitBreaker)) {
out.setTransportVersion(transportVersion);
response.writeTo(out);
bytesRef = out.moveToBytesReference();
} catch (Exception e) {
channelListener.onFailure(e);
return;
}
// respondAndRelease releases the bytes once the transport layer completes.
ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(bytesRef, transportVersion));
}

@Override
public void onFailure(Exception e) {
channelListener.onFailure(e);
}
}

public void cancelSearchTask(SearchTask task, String reason) {
CancelTasksRequest req = new CancelTasksRequest().setTargetTaskId(new TaskId(client.getLocalNodeId(), task.getId()))
.setReason("Fatal failure during search: " + reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public final class FetchSearchResult extends SearchPhaseResult {

private SearchHits hits;

private transient long searchHitsSizeBytes = 0L;
private long searchHitsSizeBytes = 0L;

// client side counter
private transient int counter;
Expand Down
Loading
Loading