Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected NodeHotThreads nodeOperation(NodeRequest request, Task task) {
.interval(request.requestOptions.interval())
.threadElementsSnapshotCount(request.requestOptions.snapshots())
.ignoreIdleThreads(request.requestOptions.ignoreIdleThreads());
final var out = transportService.newNetworkBytesStream();
final var out = transportService.newNetworkBytesStream(null);
final var trackedResource = LeakTracker.wrap(out);
var success = false;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ void onShardDone() {
return;
}
var channelListener = new ChannelActionListener<>(channel);
RecyclerBytesStreamOutput out = dependencies.transportService.newNetworkBytesStream();
RecyclerBytesStreamOutput out = dependencies.transportService.newNetworkBytesStream(null);
out.setTransportVersion(channel.getVersion());

boolean success = false;
Expand Down Expand Up @@ -997,7 +997,7 @@ void bwcRespond() {
}
}
final int resultCount = queryPhaseResultConsumer.getNumShards();
out = dependencies.transportService.newNetworkBytesStream();
out = dependencies.transportService.newNetworkBytesStream(null);
out.setTransportVersion(channel.getVersion());
try {
out.writeVInt(resultCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ private ReleasableBytesReference maybeSerializeClusterState(
}
assert clusterState.nodes().isLocalNodeElectedMaster();

try (var bytesStream = transportService.newNetworkBytesStream()) {
try (var bytesStream = transportService.newNetworkBytesStream(null)) {
try (var stream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(bytesStream))) {
stream.setTransportVersion(version);
clusterState.writeTo(stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public PublicationContext newPublicationContext(ClusterStatePublicationEvent clu
}

private ReleasableBytesReference serializeFullClusterState(ClusterState clusterState, DiscoveryNode node, TransportVersion version) {
try (RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream()) {
try (RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream(null)) {
final long uncompressedBytes;
try (StreamOutput stream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(bytesStream))) {
stream.setTransportVersion(version);
Expand Down Expand Up @@ -278,7 +278,7 @@ private ReleasableBytesReference serializeDiffClusterState(
TransportVersion version
) {
final long clusterStateVersion = newState.version();
try (RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream()) {
try (RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream(null)) {
final long uncompressedBytes;
try (StreamOutput stream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(bytesStream))) {
stream.setTransportVersion(version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@
* a short time, for instance soon being written to the network or to disk, then the imminent recycling of these pages may mean it is ok to
* keep it as-is. For results which are both small and long-lived it may be better to copy them into a freshly-allocated {@code byte[]}.
* <p>
* Any memory allocated in this way is not tracked by the {@link org.elasticsearch.common.breaker} subsystem, even if the
* {@code Recycler<BytesRef>} was obtained from {@link BigArrays#bytesRefRecycler()}, unless the caller takes steps to add this tracking
* themselves.
* Any memory allocated in this way is tracked by the {@link org.elasticsearch.common.breaker} subsystem if and only if the caller passes in
* a non-null {@link CircuitBreaker} at creation time. If the provided {@link CircuitBreaker} is {@code null} then the allocations performed
* here are untracked by circuit-breakers, even if the {@code Recycler<BytesRef>} was obtained from {@link BigArrays#bytesRefRecycler()}.
*/
public class RecyclerBytesStreamOutput extends BytesStream implements Releasable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.monitor.jvm.JvmInfo;
Expand Down Expand Up @@ -987,8 +988,8 @@ final Set<TcpChannel> getAcceptedChannels() {
}

@Override
public RecyclerBytesStreamOutput newNetworkBytesStream() {
return new RecyclerBytesStreamOutput(recycler);
public RecyclerBytesStreamOutput newNetworkBytesStream(@Nullable CircuitBreaker circuitBreaker) {
return new RecyclerBytesStreamOutput(recycler, circuitBreaker);
}

/**
Expand Down
14 changes: 9 additions & 5 deletions server/src/main/java/org/elasticsearch/transport/Transport.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.TimeValue;

Expand All @@ -31,8 +33,6 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;

import static org.elasticsearch.transport.BytesRefRecycler.NON_RECYCLING_INSTANCE;

public interface Transport extends LifecycleComponent {

/**
Expand Down Expand Up @@ -88,9 +88,13 @@ default boolean isSecure() {

RequestHandlers getRequestHandlers();

default RecyclerBytesStreamOutput newNetworkBytesStream() {
return new RecyclerBytesStreamOutput(NON_RECYCLING_INSTANCE);
}
/**
* @return a {@link RecyclerBytesStreamOutput} which allocates its pages with {@code org.elasticsearch.transport.netty4.NettyAllocator},
* tracking these allocations using the provided {@link CircuitBreaker} if this is not {@code null}.
* <p>
* In tests in which Netty is not in use, each page is allocated as a {@code new byte[]}.
*/
RecyclerBytesStreamOutput newNetworkBytesStream(@Nullable CircuitBreaker circuitBreaker);

/**
* A unidirectional connection to a {@link DiscoveryNode}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.common.ReferenceDocs;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -660,8 +661,14 @@ public ConnectionManager getConnectionManager() {
return connectionManager;
}

public RecyclerBytesStreamOutput newNetworkBytesStream() {
return transport.newNetworkBytesStream();
/**
* @return a {@link RecyclerBytesStreamOutput} which allocates its pages with {@code org.elasticsearch.transport.netty4.NettyAllocator},
* tracking these allocations using the provided {@link CircuitBreaker} if this is not {@code null}.
* <p>
* In tests in which Netty is not in use, each page is allocated as a {@code new byte[]}.
*/
public RecyclerBytesStreamOutput newNetworkBytesStream(@Nullable CircuitBreaker circuitBreaker) {
return transport.newNetworkBytesStream(circuitBreaker);
}

static class HandshakeRequest extends AbstractTransportRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
Expand All @@ -39,6 +41,7 @@
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesRefRecycler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.NodeNotConnectedException;
Expand Down Expand Up @@ -835,6 +838,11 @@ public ResponseHandlers getResponseHandlers() {
public RequestHandlers getRequestHandlers() {
return requestHandlers;
}

@Override
public RecyclerBytesStreamOutput newNetworkBytesStream(@Nullable CircuitBreaker circuitBreaker) {
return new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE, circuitBreaker);
}
}

private static void connectToNodes(NodeConnectionsService service, DiscoveryNodes discoveryNodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.BatchSummary;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
Expand Down Expand Up @@ -84,7 +85,9 @@ public void testDiffSerializationFailure() {

final TransportService transportService = mock(TransportService.class);
final BytesRefRecycler recycler = new BytesRefRecycler(new MockPageCacheRecycler(Settings.EMPTY));
when(transportService.newNetworkBytesStream()).then(invocation -> new RecyclerBytesStreamOutput(recycler));
when(transportService.newNetworkBytesStream(any())).then(
invocation -> new RecyclerBytesStreamOutput(recycler, invocation.getArgument(0))
);
Transport.Connection connection = mock(Transport.Connection.class);
when(connection.getTransportVersion()).thenReturn(TransportVersion.current());
when(transportService.getConnection(any())).thenReturn(connection);
Expand Down Expand Up @@ -206,8 +209,8 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
}

@Override
public RecyclerBytesStreamOutput newNetworkBytesStream() {
return new RecyclerBytesStreamOutput(recycler);
public RecyclerBytesStreamOutput newNetworkBytesStream(@Nullable CircuitBreaker circuitBreaker) {
return new RecyclerBytesStreamOutput(recycler, circuitBreaker);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.elasticsearch.cluster.service.FakeThreadPoolMasterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.cluster.version.CompatibilityVersionsUtils;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.logging.activity.ActivityLogWriterProvider;
Expand All @@ -94,6 +95,7 @@
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.TestEnvironment;
Expand Down Expand Up @@ -558,9 +560,9 @@ protected NamedWriteableRegistry writeableRegistry() {
}

@Override
public RecyclerBytesStreamOutput newNetworkBytesStream() {
public RecyclerBytesStreamOutput newNetworkBytesStream(@Nullable CircuitBreaker circuitBreaker) {
// skip leak checks in these tests since they do indeed leak
return new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE);
return new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE, circuitBreaker);
// TODO fix these leaks and implement leak checking
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public void testSendLocalRequestBytesTransportResponseSameVersion() throws Excep
assertEquals(request.sourceNode, "TS_A");

SimpleTestResponse tsB = new SimpleTestResponse("TS_B");
try (RecyclerBytesStreamOutput out = serviceB.newNetworkBytesStream()) {
try (RecyclerBytesStreamOutput out = serviceB.newNetworkBytesStream(null)) {
out.setTransportVersion(transportVersion1);
tsB.writeTo(out);
// simulate what happens in SearchQueryThenFetchAsyncAction with NodeQueryResponse
Expand Down Expand Up @@ -417,7 +417,7 @@ public void testSendLocalRequestBytesTransportResponseDifferentVersions() throws
assertEquals(request.sourceNode, "TS_A");

SimpleTestResponse tsB = new SimpleTestResponse("TS_B");
try (RecyclerBytesStreamOutput out = serviceB.newNetworkBytesStream()) {
try (RecyclerBytesStreamOutput out = serviceB.newNetworkBytesStream(null)) {
out.setTransportVersion(transportVersion1);
tsB.writeTo(out);
// simulate what happens in SearchQueryThenFetchAsyncAction with NodeQueryResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -1096,8 +1097,8 @@ public String toString() {
}

@Override
public RecyclerBytesStreamOutput newNetworkBytesStream() {
return new RecyclerBytesStreamOutput(clearableRecycler);
public RecyclerBytesStreamOutput newNetworkBytesStream(@Nullable CircuitBreaker circuitBreaker) {
return new RecyclerBytesStreamOutput(clearableRecycler, circuitBreaker);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.transport.BytesRefRecycler;
import org.elasticsearch.transport.CloseableConnection;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.Transport;
Expand Down Expand Up @@ -104,6 +108,11 @@ public RequestHandlers getRequestHandlers() {
return requestHandlers;
}

@Override
public RecyclerBytesStreamOutput newNetworkBytesStream(@Nullable CircuitBreaker circuitBreaker) {
return new RecyclerBytesStreamOutput(BytesRefRecycler.NON_RECYCLING_INSTANCE, circuitBreaker);
}

@Override
protected void doStart() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
Expand All @@ -20,6 +21,7 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.BytesRefRecycler;
import org.elasticsearch.transport.ConnectionProfile;
Expand Down Expand Up @@ -343,7 +345,7 @@ default void clearCallback() {}
}

@Override
public RecyclerBytesStreamOutput newNetworkBytesStream() {
return new RecyclerBytesStreamOutput(new BytesRefRecycler(recycler));
public RecyclerBytesStreamOutput newNetworkBytesStream(@Nullable CircuitBreaker circuitBreaker) {
return new RecyclerBytesStreamOutput(new BytesRefRecycler(recycler), circuitBreaker);
}
}