Skip to content

Commit 1f8ff05

Browse files
committed
Revert "Remove dedicated SSL network write buffer (#41283)"
This reverts commit f65a86c.
1 parent c4cb050 commit 1f8ff05

File tree

22 files changed

+329
-480
lines changed

22 files changed

+329
-480
lines changed

libs/nio/src/main/java/org/elasticsearch/nio/FlushOperation.java

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525

2626
public class FlushOperation {
2727

28-
private static final ByteBuffer[] EMPTY_ARRAY = new ByteBuffer[0];
29-
3028
private final BiConsumer<Void, Exception> listener;
3129
private final ByteBuffer[] buffers;
3230
private final int[] offsets;
@@ -63,38 +61,19 @@ public void incrementIndex(int delta) {
6361
}
6462

6563
public ByteBuffer[] getBuffersToWrite() {
66-
return getBuffersToWrite(length);
67-
}
68-
69-
public ByteBuffer[] getBuffersToWrite(int maxBytes) {
7064
final int index = Arrays.binarySearch(offsets, internalIndex);
71-
final int offsetIndex = index < 0 ? (-(index + 1)) - 1 : index;
72-
final int finalIndex = Arrays.binarySearch(offsets, Math.min(internalIndex + maxBytes, length));
73-
final int finalOffsetIndex = finalIndex < 0 ? (-(finalIndex + 1)) - 1 : finalIndex;
65+
int offsetIndex = index < 0 ? (-(index + 1)) - 1 : index;
7466

75-
int nBuffers = (finalOffsetIndex - offsetIndex) + 1;
67+
ByteBuffer[] postIndexBuffers = new ByteBuffer[buffers.length - offsetIndex];
7668

77-
int firstBufferPosition = internalIndex - offsets[offsetIndex];
7869
ByteBuffer firstBuffer = buffers[offsetIndex].duplicate();
79-
firstBuffer.position(firstBufferPosition);
80-
if (nBuffers == 1 && firstBuffer.remaining() == 0) {
81-
return EMPTY_ARRAY;
82-
}
83-
84-
ByteBuffer[] postIndexBuffers = new ByteBuffer[nBuffers];
70+
firstBuffer.position(internalIndex - offsets[offsetIndex]);
8571
postIndexBuffers[0] = firstBuffer;
86-
int finalOffset = offsetIndex + nBuffers;
87-
int nBytes = firstBuffer.remaining();
8872
int j = 1;
89-
for (int i = (offsetIndex + 1); i < finalOffset; ++i) {
90-
ByteBuffer buffer = buffers[i].duplicate();
91-
nBytes += buffer.remaining();
92-
postIndexBuffers[j++] = buffer;
73+
for (int i = (offsetIndex + 1); i < buffers.length; ++i) {
74+
postIndexBuffers[j++] = buffers[i].duplicate();
9375
}
9476

95-
int excessBytes = Math.max(0, nBytes - maxBytes);
96-
ByteBuffer lastBuffer = postIndexBuffers[postIndexBuffers.length - 1];
97-
lastBuffer.limit(lastBuffer.limit() - excessBytes);
9877
return postIndexBuffers;
9978
}
10079
}

libs/nio/src/main/java/org/elasticsearch/nio/FlushReadyWrite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class FlushReadyWrite extends FlushOperation implements WriteOperation {
2727
private final SocketChannelContext channelContext;
2828
private final ByteBuffer[] buffers;
2929

30-
public FlushReadyWrite(SocketChannelContext channelContext, ByteBuffer[] buffers, BiConsumer<Void, Exception> listener) {
30+
FlushReadyWrite(SocketChannelContext channelContext, ByteBuffer[] buffers, BiConsumer<Void, Exception> listener) {
3131
super(buffers, listener);
3232
this.channelContext = channelContext;
3333
this.buffers = buffers;

libs/nio/src/main/java/org/elasticsearch/nio/InboundChannelBuffer.java

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.nio;
2121

22+
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
2223
import org.elasticsearch.nio.utils.ExceptionsHelper;
2324

2425
import java.nio.ByteBuffer;
@@ -139,11 +140,11 @@ public ByteBuffer[] sliceBuffersTo(long to) {
139140

140141
ByteBuffer[] buffers = new ByteBuffer[pageCount];
141142
Iterator<Page> pageIterator = pages.iterator();
142-
ByteBuffer firstBuffer = pageIterator.next().byteBuffer().duplicate();
143+
ByteBuffer firstBuffer = pageIterator.next().byteBuffer.duplicate();
143144
firstBuffer.position(firstBuffer.position() + offset);
144145
buffers[0] = firstBuffer;
145146
for (int i = 1; i < buffers.length; i++) {
146-
buffers[i] = pageIterator.next().byteBuffer().duplicate();
147+
buffers[i] = pageIterator.next().byteBuffer.duplicate();
147148
}
148149
if (finalLimit != 0) {
149150
buffers[buffers.length - 1].limit(finalLimit);
@@ -179,14 +180,14 @@ public Page[] sliceAndRetainPagesTo(long to) {
179180
Page[] pages = new Page[pageCount];
180181
Iterator<Page> pageIterator = this.pages.iterator();
181182
Page firstPage = pageIterator.next().duplicate();
182-
ByteBuffer firstBuffer = firstPage.byteBuffer();
183+
ByteBuffer firstBuffer = firstPage.byteBuffer;
183184
firstBuffer.position(firstBuffer.position() + offset);
184185
pages[0] = firstPage;
185186
for (int i = 1; i < pages.length; i++) {
186187
pages[i] = pageIterator.next().duplicate();
187188
}
188189
if (finalLimit != 0) {
189-
pages[pages.length - 1].byteBuffer().limit(finalLimit);
190+
pages[pages.length - 1].byteBuffer.limit(finalLimit);
190191
}
191192

192193
return pages;
@@ -216,9 +217,9 @@ public ByteBuffer[] sliceBuffersFrom(long from) {
216217
ByteBuffer[] buffers = new ByteBuffer[pages.size() - pageIndex];
217218
Iterator<Page> pageIterator = pages.descendingIterator();
218219
for (int i = buffers.length - 1; i > 0; --i) {
219-
buffers[i] = pageIterator.next().byteBuffer().duplicate();
220+
buffers[i] = pageIterator.next().byteBuffer.duplicate();
220221
}
221-
ByteBuffer firstPostIndexBuffer = pageIterator.next().byteBuffer().duplicate();
222+
ByteBuffer firstPostIndexBuffer = pageIterator.next().byteBuffer.duplicate();
222223
firstPostIndexBuffer.position(firstPostIndexBuffer.position() + indexInPage);
223224
buffers[0] = firstPostIndexBuffer;
224225

@@ -267,4 +268,53 @@ private int pageIndex(long index) {
267268
private int indexInPage(long index) {
268269
return (int) (index & PAGE_MASK);
269270
}
271+
272+
public static class Page implements AutoCloseable {
273+
274+
private final ByteBuffer byteBuffer;
275+
// This is reference counted as some implementations want to retain the byte pages by calling
276+
// sliceAndRetainPagesTo. With reference counting we can increment the reference count, return the
277+
// pages, and safely close them when this channel buffer is done with them. The reference count
278+
// would be 1 at that point, meaning that the pages will remain until the implementation closes
279+
// theirs.
280+
private final RefCountedCloseable refCountedCloseable;
281+
282+
public Page(ByteBuffer byteBuffer, Runnable closeable) {
283+
this(byteBuffer, new RefCountedCloseable(closeable));
284+
}
285+
286+
private Page(ByteBuffer byteBuffer, RefCountedCloseable refCountedCloseable) {
287+
this.byteBuffer = byteBuffer;
288+
this.refCountedCloseable = refCountedCloseable;
289+
}
290+
291+
private Page duplicate() {
292+
refCountedCloseable.incRef();
293+
return new Page(byteBuffer.duplicate(), refCountedCloseable);
294+
}
295+
296+
public ByteBuffer getByteBuffer() {
297+
return byteBuffer;
298+
}
299+
300+
@Override
301+
public void close() {
302+
refCountedCloseable.decRef();
303+
}
304+
305+
private static class RefCountedCloseable extends AbstractRefCounted {
306+
307+
private final Runnable closeable;
308+
309+
private RefCountedCloseable(Runnable closeable) {
310+
super("byte array page");
311+
this.closeable = closeable;
312+
}
313+
314+
@Override
315+
protected void closeInternal() {
316+
closeable.run();
317+
}
318+
}
319+
}
270320
}

libs/nio/src/main/java/org/elasticsearch/nio/Page.java

Lines changed: 0 additions & 89 deletions
This file was deleted.

libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ protected int flushToChannel(FlushOperation flushOperation) throws IOException {
325325
ioBuffer.clear();
326326
ioBuffer.limit(Math.min(WRITE_LIMIT, ioBuffer.limit()));
327327
int j = 0;
328-
ByteBuffer[] buffers = flushOperation.getBuffersToWrite(WRITE_LIMIT);
328+
ByteBuffer[] buffers = flushOperation.getBuffersToWrite();
329329
while (j < buffers.length && ioBuffer.remaining() > 0) {
330330
ByteBuffer buffer = buffers[j++];
331331
copyBytes(buffer, ioBuffer);

libs/nio/src/test/java/org/elasticsearch/nio/BytesChannelContextTests.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.util.function.Consumer;
3232

3333
import static org.mockito.Matchers.any;
34-
import static org.mockito.Matchers.anyInt;
3534
import static org.mockito.Matchers.eq;
3635
import static org.mockito.Mockito.mock;
3736
import static org.mockito.Mockito.times;
@@ -169,7 +168,7 @@ public void testQueuedWriteIsFlushedInFlushCall() throws Exception {
169168

170169
assertTrue(context.readyForFlush());
171170

172-
when(flushOperation.getBuffersToWrite(anyInt())).thenReturn(buffers);
171+
when(flushOperation.getBuffersToWrite()).thenReturn(buffers);
173172
when(flushOperation.isFullyFlushed()).thenReturn(false, true);
174173
when(flushOperation.getListener()).thenReturn(listener);
175174
context.flushChannel();
@@ -188,7 +187,7 @@ public void testPartialFlush() throws IOException {
188187
assertTrue(context.readyForFlush());
189188

190189
when(flushOperation.isFullyFlushed()).thenReturn(false);
191-
when(flushOperation.getBuffersToWrite(anyInt())).thenReturn(new ByteBuffer[] {ByteBuffer.allocate(3)});
190+
when(flushOperation.getBuffersToWrite()).thenReturn(new ByteBuffer[] {ByteBuffer.allocate(3)});
192191
context.flushChannel();
193192

194193
verify(listener, times(0)).accept(null, null);
@@ -202,8 +201,8 @@ public void testMultipleWritesPartialFlushes() throws IOException {
202201
BiConsumer<Void, Exception> listener2 = mock(BiConsumer.class);
203202
FlushReadyWrite flushOperation1 = mock(FlushReadyWrite.class);
204203
FlushReadyWrite flushOperation2 = mock(FlushReadyWrite.class);
205-
when(flushOperation1.getBuffersToWrite(anyInt())).thenReturn(new ByteBuffer[] {ByteBuffer.allocate(3)});
206-
when(flushOperation2.getBuffersToWrite(anyInt())).thenReturn(new ByteBuffer[] {ByteBuffer.allocate(3)});
204+
when(flushOperation1.getBuffersToWrite()).thenReturn(new ByteBuffer[] {ByteBuffer.allocate(3)});
205+
when(flushOperation2.getBuffersToWrite()).thenReturn(new ByteBuffer[] {ByteBuffer.allocate(3)});
207206
when(flushOperation1.getListener()).thenReturn(listener);
208207
when(flushOperation2.getListener()).thenReturn(listener2);
209208

@@ -238,7 +237,7 @@ public void testWhenIOExceptionThrownListenerIsCalled() throws IOException {
238237
assertTrue(context.readyForFlush());
239238

240239
IOException exception = new IOException();
241-
when(flushOperation.getBuffersToWrite(anyInt())).thenReturn(buffers);
240+
when(flushOperation.getBuffersToWrite()).thenReturn(buffers);
242241
when(rawChannel.write(any(ByteBuffer.class))).thenThrow(exception);
243242
when(flushOperation.getListener()).thenReturn(listener);
244243
expectThrows(IOException.class, () -> context.flushChannel());
@@ -253,7 +252,7 @@ public void testWriteIOExceptionMeansChannelReadyToClose() throws IOException {
253252
context.queueWriteOperation(flushOperation);
254253

255254
IOException exception = new IOException();
256-
when(flushOperation.getBuffersToWrite(anyInt())).thenReturn(buffers);
255+
when(flushOperation.getBuffersToWrite()).thenReturn(buffers);
257256
when(rawChannel.write(any(ByteBuffer.class))).thenThrow(exception);
258257

259258
assertFalse(context.selectorShouldClose());

libs/nio/src/test/java/org/elasticsearch/nio/FlushOperationTests.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -65,45 +65,29 @@ public void testMultipleFlushesWithCompositeBuffer() throws IOException {
6565
ByteBuffer[] byteBuffers = writeOp.getBuffersToWrite();
6666
assertEquals(3, byteBuffers.length);
6767
assertEquals(5, byteBuffers[0].remaining());
68-
ByteBuffer[] byteBuffersWithLimit = writeOp.getBuffersToWrite(10);
69-
assertEquals(2, byteBuffersWithLimit.length);
70-
assertEquals(5, byteBuffersWithLimit[0].remaining());
71-
assertEquals(5, byteBuffersWithLimit[1].remaining());
7268

7369
writeOp.incrementIndex(5);
7470
assertFalse(writeOp.isFullyFlushed());
7571
byteBuffers = writeOp.getBuffersToWrite();
7672
assertEquals(2, byteBuffers.length);
7773
assertEquals(15, byteBuffers[0].remaining());
78-
assertEquals(3, byteBuffers[1].remaining());
79-
byteBuffersWithLimit = writeOp.getBuffersToWrite(10);
80-
assertEquals(1, byteBuffersWithLimit.length);
81-
assertEquals(10, byteBuffersWithLimit[0].remaining());
8274

8375
writeOp.incrementIndex(2);
8476
assertFalse(writeOp.isFullyFlushed());
8577
byteBuffers = writeOp.getBuffersToWrite();
8678
assertEquals(2, byteBuffers.length);
8779
assertEquals(13, byteBuffers[0].remaining());
88-
assertEquals(3, byteBuffers[1].remaining());
89-
byteBuffersWithLimit = writeOp.getBuffersToWrite(10);
90-
assertEquals(1, byteBuffersWithLimit.length);
91-
assertEquals(10, byteBuffersWithLimit[0].remaining());
9280

9381
writeOp.incrementIndex(15);
9482
assertFalse(writeOp.isFullyFlushed());
9583
byteBuffers = writeOp.getBuffersToWrite();
9684
assertEquals(1, byteBuffers.length);
9785
assertEquals(1, byteBuffers[0].remaining());
98-
byteBuffersWithLimit = writeOp.getBuffersToWrite(10);
99-
assertEquals(1, byteBuffersWithLimit.length);
100-
assertEquals(1, byteBuffersWithLimit[0].remaining());
10186

10287
writeOp.incrementIndex(1);
10388
assertTrue(writeOp.isFullyFlushed());
10489
byteBuffers = writeOp.getBuffersToWrite();
105-
assertEquals(0, byteBuffers.length);
106-
byteBuffersWithLimit = writeOp.getBuffersToWrite(10);
107-
assertEquals(0, byteBuffersWithLimit.length);
90+
assertEquals(1, byteBuffers.length);
91+
assertEquals(0, byteBuffers[0].remaining());
10892
}
10993
}

0 commit comments

Comments
 (0)