Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import org.elasticsearch.core.DirectAccessInput;

import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.util.function.IntFunction;

/**
Expand Down Expand Up @@ -74,14 +76,15 @@ public static <R> R withSlice(
@SuppressWarnings("unchecked")
R[] result = (R[]) new Object[1];
boolean available = dai.withByteBufferSlice(offset, length, bb -> {
assert bb.isDirect();
in.skipBytes(length);
result[0] = action.apply(MemorySegment.ofBuffer(bb));
});
if (available) {
return result[0];
}
}
return action.apply(copyOnHeap(in, Math.toIntExact(length), scratchSupplier));
return copyAndApply(in, Math.toIntExact(length), scratchSupplier, action);
}

/**
Expand All @@ -100,15 +103,29 @@ public static void checkInputType(IndexInput in) {
}
}

private static final boolean SUPPORTS_HEAP_SEGMENTS = Runtime.version().feature() >= 22;

/**
* Reads the given number of bytes from the current position of the
* given IndexInput into a heap-backed memory segment. The returned
* segment is sliced to exactly {@code bytesToRead} bytes, even if
* the underlying array is larger.
* Reads bytes from the index input and applies the action to a memory
* segment containing the data. On Java 22+ a heap-backed segment is
* used directly. On Java 21, where heap segments cannot be passed to
* native downcalls, the data is copied into a confined arena.
*/
private static MemorySegment copyOnHeap(IndexInput in, int bytesToRead, IntFunction<byte[]> scratchSupplier) throws IOException {
private static <R> R copyAndApply(
IndexInput in,
int bytesToRead,
IntFunction<byte[]> scratchSupplier,
CheckedFunction<MemorySegment, R, IOException> action
) throws IOException {
byte[] buf = scratchSupplier.apply(bytesToRead);
in.readBytes(buf, 0, bytesToRead);
return MemorySegment.ofArray(buf).asSlice(0, bytesToRead);
if (SUPPORTS_HEAP_SEGMENTS) {
return action.apply(MemorySegment.ofArray(buf).asSlice(0, bytesToRead));
}
try (Arena arena = Arena.ofConfined()) {
MemorySegment nativeSegment = arena.allocate(bytesToRead);
MemorySegment.copy(buf, 0, nativeSegment, ValueLayout.JAVA_BYTE, 0, bytesToRead);
return action.apply(nativeSegment);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.ByteBuffer;
import java.util.Arrays;

Expand Down Expand Up @@ -72,6 +74,26 @@ public void testWithSlicePlainIndexInput() throws Exception {
}
}

public void testPlainInputFallbackProducesNativeSegmentOnJava21() throws Exception {
byte[] data = randomByteArrayOfLength(256);
try (Directory dir = new NIOFSDirectory(createTempDir())) {
writeData(dir, data);
try (IndexInput in = dir.openInput(FILE_NAME, IOContext.DEFAULT)) {
assertFalse(in instanceof MemorySegmentAccessInput);
Copy link
Copy Markdown
Member

@thecoop thecoop Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertThat(in, not(instanceOf(MemorySegmentAccessInput.class))), which will give you the actual type it is

assertFalse(in instanceof DirectAccessInput);
IndexInputUtils.withSlice(in, data.length, byte[]::new, segment -> {
if (Runtime.version().feature() < 22) {
assertTrue("segment should be native-backed on Java 21", segment.heapBase().isEmpty());
}
byte[] buf = new byte[(int) segment.byteSize()];
MemorySegment.ofArray(buf).copyFrom(segment);
assertArrayEquals(data, buf);
return null;
});
}
}
}

// -- constructor validation tests -----------------------------------------

public void testES92ConstructorAcceptsPlainInput() throws Exception {
Expand Down Expand Up @@ -167,9 +189,12 @@ static class DirectAccessWrapper extends FilterIndexInput implements DirectAcces

@Override
public boolean withByteBufferSlice(long offset, long length, CheckedConsumer<ByteBuffer, IOException> action) throws IOException {
ByteBuffer bb = ByteBuffer.wrap(data, (int) offset, (int) length).asReadOnlyBuffer();
action.accept(bb);
return true;
try (Arena arena = Arena.ofConfined()) {
MemorySegment segment = arena.allocate(length);
MemorySegment.copy(data, (int) offset, segment, ValueLayout.JAVA_BYTE, 0, (int) length);
action.accept(segment.asByteBuffer().asReadOnlyBuffer());
return true;
}
}

@Override
Expand Down