Skip to content

Commit fe470d5

Browse files
committed
Implement byte array reusage in NioTransport (#27696)
This is related to #27563. This commit modifies the InboundChannelBuffer to support releasable byte pages. These byte pages are provided by the PageCacheRecycler. The PageCacheRecycler must be passed to the Transport with this change.
1 parent 0a368d1 commit fe470d5

File tree

43 files changed

+278
-90
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+278
-90
lines changed

core/src/main/java/org/elasticsearch/client/transport/TransportClient.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.common.transport.TransportAddress;
4444
import org.elasticsearch.common.unit.TimeValue;
4545
import org.elasticsearch.common.util.BigArrays;
46+
import org.elasticsearch.common.util.PageCacheRecycler;
4647
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
4748
import org.elasticsearch.indices.breaker.CircuitBreakerService;
4849
import org.elasticsearch.node.InternalSettingsPreparer;
@@ -168,11 +169,12 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
168169
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
169170
settingsModule.getClusterSettings());
170171
resourcesToClose.add(circuitBreakerService);
171-
BigArrays bigArrays = new BigArrays(settings, circuitBreakerService);
172+
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
173+
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService);
172174
resourcesToClose.add(bigArrays);
173175
modules.add(settingsModule);
174176
NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
175-
bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
177+
bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
176178
final Transport transport = networkModule.getTransportSupplier().get();
177179
final TransportService transportService = new TransportService(settings, transport, threadPool,
178180
networkModule.getTransportInterceptor(),

core/src/main/java/org/elasticsearch/common/network/NetworkModule.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.common.settings.Setting.Property;
3535
import org.elasticsearch.common.settings.Settings;
3636
import org.elasticsearch.common.util.BigArrays;
37+
import org.elasticsearch.common.util.PageCacheRecycler;
3738
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3839
import org.elasticsearch.common.xcontent.XContentParser;
3940
import org.elasticsearch.http.HttpServerTransport;
@@ -107,6 +108,7 @@ public final class NetworkModule {
107108
*/
108109
public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,
109110
BigArrays bigArrays,
111+
PageCacheRecycler pageCacheRecycler,
110112
CircuitBreakerService circuitBreakerService,
111113
NamedWriteableRegistry namedWriteableRegistry,
112114
NamedXContentRegistry xContentRegistry,
@@ -121,9 +123,9 @@ public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlu
121123
registerHttpTransport(entry.getKey(), entry.getValue());
122124
}
123125
}
124-
Map<String, Supplier<Transport>> httpTransportFactory = plugin.getTransports(settings, threadPool, bigArrays,
126+
Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, bigArrays, pageCacheRecycler,
125127
circuitBreakerService, namedWriteableRegistry, networkService);
126-
for (Map.Entry<String, Supplier<Transport>> entry : httpTransportFactory.entrySet()) {
128+
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
127129
registerTransport(entry.getKey(), entry.getValue());
128130
}
129131
List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry,

core/src/main/java/org/elasticsearch/common/util/BigArrays.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,9 +372,9 @@ public T set(long index, T value) {
372372
final boolean checkBreaker;
373373
private final BigArrays circuitBreakingInstance;
374374

375-
public BigArrays(Settings settings, @Nullable final CircuitBreakerService breakerService) {
375+
public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService) {
376376
// Checking the breaker is disabled if not specified
377-
this(new PageCacheRecycler(settings), breakerService, false);
377+
this(recycler, breakerService, false);
378378
}
379379

380380
// public for tests

core/src/main/java/org/elasticsearch/common/util/PageCacheRecycler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,10 @@ public void close() {
6565
Releasables.close(true, bytePage, intPage, longPage, objectPage);
6666
}
6767

68-
protected PageCacheRecycler(Settings settings) {
68+
public PageCacheRecycler(Settings settings) {
6969
super(settings);
70-
final Type type = TYPE_SETTING .get(settings);
71-
final long limit = LIMIT_HEAP_SETTING .get(settings).getBytes();
70+
final Type type = TYPE_SETTING.get(settings);
71+
final long limit = LIMIT_HEAP_SETTING.get(settings).getBytes();
7272
final int availableProcessors = EsExecutors.numberOfProcessors(settings);
7373

7474
// We have a global amount of memory that we need to divide across data types.

core/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.elasticsearch.common.transport.TransportAddress;
8181
import org.elasticsearch.common.unit.TimeValue;
8282
import org.elasticsearch.common.util.BigArrays;
83+
import org.elasticsearch.common.util.PageCacheRecycler;
8384
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
8485
import org.elasticsearch.discovery.Discovery;
8586
import org.elasticsearch.discovery.DiscoveryModule;
@@ -364,7 +365,8 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
364365
modules.add(new GatewayModule());
365366

366367

367-
BigArrays bigArrays = createBigArrays(settings, circuitBreakerService);
368+
PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
369+
BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
368370
resourcesToClose.add(bigArrays);
369371
modules.add(settingsModule);
370372
List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
@@ -404,7 +406,8 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
404406

405407
final RestController restController = actionModule.getRestController();
406408
final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
407-
threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, restController);
409+
threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
410+
networkService, restController);
408411
Collection<UnaryOperator<Map<String, MetaData.Custom>>> customMetaDataUpgraders =
409412
pluginsService.filterPlugins(Plugin.class).stream()
410413
.map(Plugin::getCustomMetaDataUpgrader)
@@ -896,8 +899,16 @@ public static CircuitBreakerService createCircuitBreakerService(Settings setting
896899
* Creates a new {@link BigArrays} instance used for this node.
897900
* This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing
898901
*/
899-
BigArrays createBigArrays(Settings settings, CircuitBreakerService circuitBreakerService) {
900-
return new BigArrays(settings, circuitBreakerService);
902+
BigArrays createBigArrays(PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) {
903+
return new BigArrays(pageCacheRecycler, circuitBreakerService);
904+
}
905+
906+
/**
907+
* Creates a new {@link BigArrays} instance used for this node.
908+
* This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing
909+
*/
910+
PageCacheRecycler createPageCacheRecycler(Settings settings) {
911+
return new PageCacheRecycler(settings);
901912
}
902913

903914
/**

core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.common.network.NetworkService;
2828
import org.elasticsearch.common.settings.Settings;
2929
import org.elasticsearch.common.util.BigArrays;
30+
import org.elasticsearch.common.util.PageCacheRecycler;
3031
import org.elasticsearch.common.util.concurrent.ThreadContext;
3132
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3233
import org.elasticsearch.http.HttpServerTransport;
@@ -58,6 +59,7 @@ default List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegist
5859
* See {@link org.elasticsearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation.
5960
*/
6061
default Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
62+
PageCacheRecycler pageCacheRecycler,
6163
CircuitBreakerService circuitBreakerService,
6264
NamedWriteableRegistry namedWriteableRegistry,
6365
NetworkService networkService) {

core/src/test/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutputTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.common.settings.Settings;
2323
import org.elasticsearch.common.util.MockBigArrays;
24+
import org.elasticsearch.common.util.MockPageCacheRecycler;
2425
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
2526
import org.elasticsearch.test.ESTestCase;
2627

@@ -30,7 +31,7 @@ public class ReleasableBytesStreamOutputTests extends ESTestCase {
3031

3132
public void testRelease() throws Exception {
3233
MockBigArrays mockBigArrays =
33-
new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
34+
new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
3435
try (ReleasableBytesStreamOutput output =
3536
getRandomReleasableBytesStreamOutput(mockBigArrays)) {
3637
output.writeBoolean(randomBoolean());

core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.common.settings.Settings;
2828
import org.elasticsearch.common.transport.BoundTransportAddress;
2929
import org.elasticsearch.common.util.BigArrays;
30+
import org.elasticsearch.common.util.PageCacheRecycler;
3031
import org.elasticsearch.common.util.concurrent.ThreadContext;
3132
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3233
import org.elasticsearch.http.HttpInfo;
@@ -133,6 +134,7 @@ public void testRegisterTransport() {
133134
NetworkPlugin plugin = new NetworkPlugin() {
134135
@Override
135136
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
137+
PageCacheRecycler pageCacheRecycler,
136138
CircuitBreakerService circuitBreakerService,
137139
NamedWriteableRegistry namedWriteableRegistry,
138140
NetworkService networkService) {
@@ -193,6 +195,7 @@ public void testOverrideDefault() {
193195
NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() {
194196
@Override
195197
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
198+
PageCacheRecycler pageCacheRecycler,
196199
CircuitBreakerService circuitBreakerService,
197200
NamedWriteableRegistry namedWriteableRegistry,
198201
NetworkService networkService) {
@@ -227,6 +230,7 @@ public void testDefaultKeys() {
227230
NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() {
228231
@Override
229232
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
233+
PageCacheRecycler pageCacheRecycler,
230234
CircuitBreakerService circuitBreakerService,
231235
NamedWriteableRegistry namedWriteableRegistry,
232236
NetworkService networkService) {
@@ -306,7 +310,7 @@ public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistr
306310
}
307311

308312
private NetworkModule newNetworkModule(Settings settings, boolean transportClient, NetworkPlugin... plugins) {
309-
return new NetworkModule(settings, transportClient, Arrays.asList(plugins), threadPool, null, null, null, xContentRegistry(), null,
310-
new NullDispatcher());
313+
return new NetworkModule(settings, transportClient, Arrays.asList(plugins), threadPool, null, null, null, null,
314+
xContentRegistry(), null, new NullDispatcher());
311315
}
312316
}

core/src/test/java/org/elasticsearch/common/util/BigArraysTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
public class BigArraysTests extends ESTestCase {
4343

4444
private BigArrays randombigArrays() {
45-
return new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
45+
return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
4646
}
4747

4848
private BigArrays bigArrays;

core/src/test/java/org/elasticsearch/common/util/BytesRefHashTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class BytesRefHashTests extends ESSingleNodeTestCase {
4141
BytesRefHash hash;
4242

4343
private BigArrays randombigArrays() {
44-
return new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
44+
return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
4545
}
4646

4747
private void newHash() {

0 commit comments

Comments
 (0)