diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java index ae0e4ea3be41a..9279bb67bf2f3 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java @@ -15,6 +15,10 @@ import org.elasticsearch.nativeaccess.lib.NativeLibraryProvider; import org.elasticsearch.nativeaccess.lib.ZstdLibrary; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; + abstract class AbstractNativeAccess implements NativeAccess { protected static final Logger logger = LogManager.getLogger(NativeAccess.class); @@ -57,6 +61,12 @@ public CloseableByteBuffer newConfinedBuffer(int len) { return javaLib.newConfinedBuffer(len); } + @Override + public CloseableMappedByteBuffer map(FileChannel fileChannel, MapMode mode, long position, long size) throws IOException { + assert fileChannel != null && position >= 0 && size > 0; + return javaLib.map(fileChannel, mode, position, size); + } + @Override public boolean isMemoryLocked() { return isMemoryLocked; diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/CloseableMappedByteBuffer.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/CloseableMappedByteBuffer.java new file mode 100644 index 0000000000000..df72ee1185514 --- /dev/null +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/CloseableMappedByteBuffer.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.nativeaccess; + +/** A closeable buffer backed by a mapped file. */ +public interface CloseableMappedByteBuffer extends CloseableByteBuffer { + + /** + * Returns a slice of this buffer. Closing a slice does not close it's parent. + */ + CloseableMappedByteBuffer slice(long index, long length); + + /** + * Prefetches the given offset and length. + */ + void prefetch(long offset, long length); +} diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java index d4404cb56949f..86ef79c1ba573 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java @@ -9,6 +9,9 @@ package org.elasticsearch.nativeaccess; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; import java.nio.file.Path; import java.util.Optional; import java.util.OptionalLong; @@ -103,6 +106,13 @@ default WindowsFunctions getWindowsFunctions() { */ CloseableByteBuffer newConfinedBuffer(int len); + /** + * Creates a new {@link CloseableMappedByteBuffer} using a shared arena. The buffer can be used + * across multiple threads, and should be closed. + * @return the buffer + */ + CloseableMappedByteBuffer map(FileChannel fileChannel, MapMode mode, long position, long size) throws IOException; + /** * Possible stats for execution filtering. */ diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java index 677d6169cb55b..a4766f0a9f114 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java @@ -12,6 +12,7 @@ import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import java.nio.channels.FileChannel; import java.nio.file.Path; import java.util.Optional; import java.util.OptionalLong; @@ -89,6 +90,12 @@ public CloseableByteBuffer newConfinedBuffer(int len) { return null; } + @Override + public CloseableMappedByteBuffer map(FileChannel fileChannel, FileChannel.MapMode mode, long position, long size) { + logger.warn("cannot map because native access is not available"); + return null; + } + @Override public Optional getVectorSimilarityFunctions() { logger.warn("cannot get vector distance because native access is not available"); diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/PosixNativeAccess.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/PosixNativeAccess.java index 43a0a3510e8ef..c170883232f0c 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/PosixNativeAccess.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/PosixNativeAccess.java @@ -10,10 +10,13 @@ package org.elasticsearch.nativeaccess; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.nativeaccess.jdk.PosixCloseableMappedByteBuffer; import org.elasticsearch.nativeaccess.lib.NativeLibraryProvider; import org.elasticsearch.nativeaccess.lib.PosixCLibrary; import org.elasticsearch.nativeaccess.lib.VectorLibrary; +import java.io.IOException; +import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.util.Optional; @@ -191,6 +194,11 @@ public Optional getVectorSimilarityFunctions() { return Optional.ofNullable(vectorDistance); } + @Override + public CloseableMappedByteBuffer map(FileChannel fileChannel, FileChannel.MapMode mode, long position, long size) throws IOException { + return PosixCloseableMappedByteBuffer.ofShared(fileChannel, mode, position, size); + } + String rlimitToString(long value) { if (value == constants.RLIMIT_INFINITY()) { return "unlimited"; diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/WindowsNativeAccess.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/WindowsNativeAccess.java index aae4938783916..eb4b590df1627 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/WindowsNativeAccess.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/WindowsNativeAccess.java @@ -9,10 +9,13 @@ package org.elasticsearch.nativeaccess; +import org.elasticsearch.nativeaccess.jdk.JdkCloseableMappedByteBuffer; import org.elasticsearch.nativeaccess.lib.Kernel32Library; import org.elasticsearch.nativeaccess.lib.Kernel32Library.Handle; import org.elasticsearch.nativeaccess.lib.NativeLibraryProvider; +import java.io.IOException; +import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.util.Optional; @@ -171,4 +174,9 @@ public WindowsFunctions getWindowsFunctions() { public Optional getVectorSimilarityFunctions() { return Optional.empty(); // not supported yet } + + @Override + public CloseableMappedByteBuffer map(FileChannel fileChannel, FileChannel.MapMode mode, long position, long size) throws IOException { + return JdkCloseableMappedByteBuffer.ofShared(fileChannel, mode, position, size); + } } diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkCloseableMappedByteBuffer.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkCloseableMappedByteBuffer.java new file mode 100644 index 0000000000000..ffc5b058a8df6 --- /dev/null +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkCloseableMappedByteBuffer.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.nativeaccess.jdk; + +import org.elasticsearch.nativeaccess.CloseableMappedByteBuffer; + +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.util.Objects; + +public class JdkCloseableMappedByteBuffer implements CloseableMappedByteBuffer { + + private final Arena arena; + protected final MemorySegment segment; + private final MappedByteBuffer bufferView; + + public static JdkCloseableMappedByteBuffer ofShared(FileChannel fileChannel, MapMode mode, long position, long size) + throws IOException { + var arena = Arena.ofShared(); + var seg = fileChannel.map(mode, position, size, arena); + return new JdkCloseableMappedByteBuffer(seg, arena); + } + + protected JdkCloseableMappedByteBuffer(MemorySegment seg, Arena arena) { + this.arena = arena; + this.segment = seg; + this.bufferView = (MappedByteBuffer) seg.asByteBuffer(); + } + + @Override + public MappedByteBuffer buffer() { + return bufferView; + } + + @Override + public void close() { + if (arena != null) { + arena.close(); + } + } + + @Override + public CloseableMappedByteBuffer slice(long index, long length) { + var slice = segment.asSlice(index, length); + return new JdkCloseableMappedByteBuffer(slice, null); // closing a slice does not close the parent. + } + + @Override + public void prefetch(long offset, long length) { + Objects.checkFromIndexSize(offset, length, segment.byteSize()); + // no explicit action, override in subclass if needed. + } +} diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java index 14ff184ca02cd..4b1a7229d5b37 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java @@ -10,8 +10,12 @@ package org.elasticsearch.nativeaccess.jdk; import org.elasticsearch.nativeaccess.CloseableByteBuffer; +import org.elasticsearch.nativeaccess.CloseableMappedByteBuffer; import org.elasticsearch.nativeaccess.lib.JavaLibrary; +import java.io.IOException; +import java.nio.channels.FileChannel; + class JdkJavaLibrary implements JavaLibrary { @Override @@ -23,4 +27,9 @@ public CloseableByteBuffer newSharedBuffer(int len) { public CloseableByteBuffer newConfinedBuffer(int len) { return JdkCloseableByteBuffer.ofConfined(len); } + + @Override + public CloseableMappedByteBuffer map(FileChannel fileChannel, FileChannel.MapMode mode, long position, long size) throws IOException { + return JdkCloseableMappedByteBuffer.ofShared(fileChannel, mode, position, size); + } } diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkPosixCLibrary.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkPosixCLibrary.java index 727f271d3a0c0..3f14f836e02d3 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkPosixCLibrary.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/JdkPosixCLibrary.java @@ -23,6 +23,8 @@ import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; +import java.lang.ref.Reference; +import java.util.Objects; import static java.lang.foreign.MemoryLayout.PathElement.groupElement; import static java.lang.foreign.ValueLayout.ADDRESS; @@ -37,6 +39,8 @@ class JdkPosixCLibrary implements PosixCLibrary { private static final Logger logger = LogManager.getLogger(JdkPosixCLibrary.class); + private static final int PAGE_SIZE; + // errno can change between system calls, so we capture it private static final StructLayout CAPTURE_ERRNO_LAYOUT = Linker.Option.captureStateLayout(); static final Linker.Option CAPTURE_ERRNO_OPTION = Linker.Option.captureCallState("errno"); @@ -53,6 +57,10 @@ class JdkPosixCLibrary implements PosixCLibrary { FunctionDescriptor.of(JAVA_INT, JAVA_INT, ADDRESS) ); private static final MethodHandle mlockall$mh = downcallHandleWithErrno("mlockall", FunctionDescriptor.of(JAVA_INT, JAVA_INT)); + private static final MethodHandle madvise$mh = downcallHandleWithErrno( + "madvise", + FunctionDescriptor.of(JAVA_INT, JAVA_LONG, JAVA_LONG, JAVA_INT) + ); private static final MethodHandle fcntl$mh = downcallHandle( "fcntl", FunctionDescriptor.of(JAVA_INT, JAVA_INT, JAVA_INT, ADDRESS), @@ -92,6 +100,12 @@ class JdkPosixCLibrary implements PosixCLibrary { ); } fstat$mh = fstat; + + try { + PAGE_SIZE = (int) downcallHandle("getpagesize", FunctionDescriptor.of(JAVA_INT)).invokeExact(); + } catch (Throwable t) { + throw new AssertionError(t); + } } private static final MethodHandle socket$mh = downcallHandleWithErrno( "socket", @@ -182,6 +196,33 @@ public int mlockall(int flags) { } } + @Override + public int madvise(MemorySegment segment, long offset, long length, int advice) { + if (segment.isNative() == false) { + throw new IllegalArgumentException("unexpected non-native segment: " + segment); + } + Objects.checkFromIndexSize(offset, length, segment.byteSize()); + long base = segment.address() + offset; + try { + return (int) madvise$mh.invokeExact(errnoState, base, length, advice); + } catch (Throwable t) { + throw madviseError(t, segment); + } finally { + // protects the segment from being potentially being GC'ed during out downcall + Reference.reachabilityFence(segment); + } + } + + static Error madviseError(Throwable t, MemorySegment segment) { + String msg = "madvise failed: segment=" + segment + ", scope=" + segment.scope() + ", isAlive=" + segment.scope().isAlive(); + return new AssertionError(msg, t); + } + + @Override + public int getPageSize() { + return PAGE_SIZE; + } + @Override public int fcntl(int fd, int cmd, FStore fst) { assert fst instanceof JdkFStore; diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/PosixCloseableMappedByteBuffer.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/PosixCloseableMappedByteBuffer.java new file mode 100644 index 0000000000000..e6c54987e5ab9 --- /dev/null +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/jdk/PosixCloseableMappedByteBuffer.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.nativeaccess.jdk; + +import org.elasticsearch.nativeaccess.lib.NativeLibraryProvider; +import org.elasticsearch.nativeaccess.lib.PosixCLibrary; + +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.util.Objects; + +public class PosixCloseableMappedByteBuffer extends JdkCloseableMappedByteBuffer { + + static final PosixCLibrary LIB = NativeLibraryProvider.instance().getLibrary(PosixCLibrary.class); + static final int PAGE_SIZE = LIB.getPageSize(); + + public static PosixCloseableMappedByteBuffer ofShared(FileChannel fileChannel, MapMode mode, long position, long size) + throws IOException { + var arena = Arena.ofShared(); + var seg = fileChannel.map(mode, position, size, arena); + return new PosixCloseableMappedByteBuffer(seg, arena); + } + + protected PosixCloseableMappedByteBuffer(MemorySegment seg, Arena arena) { + super(seg, arena); + } + + @Override + public void prefetch(long offset, long length) { + Objects.checkFromIndexSize(offset, length, segment.byteSize()); + // Align offset with the page size, this is required for madvise. + // Compute the offset of the current position in the OS's page. + final long offsetInPage = (segment.address() + offset) % PAGE_SIZE; + offset -= offsetInPage; + length += offsetInPage; + if (offset < 0) { + // start of the page is before the start of this segment, ignore the first page. + offset += PAGE_SIZE; + length -= PAGE_SIZE; + if (length <= 0) { + // This segment has no data beyond the first page. + return; + } + } + int ret = LIB.madvise(segment, offset, length, PosixCLibrary.POSIX_MADV_WILLNEED); + if (ret != 0) { + int errno = LIB.errno(); + throw new RuntimeException("madvise failed with (error=" + errno + "): " + LIB.strerror(errno)); + } + } +} diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java index 52b4a7aea2af8..02fe7893dc139 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java @@ -10,9 +10,17 @@ package org.elasticsearch.nativeaccess.lib; import org.elasticsearch.nativeaccess.CloseableByteBuffer; +import org.elasticsearch.nativeaccess.CloseableMappedByteBuffer; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; public non-sealed interface JavaLibrary extends NativeLibrary { CloseableByteBuffer newSharedBuffer(int len); CloseableByteBuffer newConfinedBuffer(int len); + + CloseableMappedByteBuffer map(FileChannel fileChannel, MapMode mode, long position, long size) throws IOException; + } diff --git a/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/PosixCLibrary.java b/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/PosixCLibrary.java index 20312bc6e141c..5221800f5819b 100644 --- a/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/PosixCLibrary.java +++ b/libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/PosixCLibrary.java @@ -11,6 +11,8 @@ import org.elasticsearch.nativeaccess.CloseableByteBuffer; +import java.lang.foreign.MemorySegment; + /** * Provides access to methods in libc.so available on POSIX systems. */ @@ -22,6 +24,13 @@ public non-sealed interface PosixCLibrary extends NativeLibrary { /** socket type indicating a datagram-oriented socket */ int SOCK_DGRAM = 2; + int POSIX_MADV_NORMAL = 0; + int POSIX_MADV_RANDOM = 1; + int POSIX_MADV_SEQUENTIAL = 2; + int POSIX_MADV_WILLNEED = 3; + int POSIX_MADV_DONTNEED = 4; + int POSIX_MADV_NOREUSE = 5; + /** * Gets the effective userid of the current process. * @@ -64,6 +73,39 @@ interface RLimit { */ int mlockall(int flags); + /** + * Provides advice to the operating system about how a region of memory will be accessed, + * allowing the kernel to optimize memory management. + *

+ * This method is a thin wrapper around the POSIX {@code madvise(2)} system + * call. The call is advisory only and does not guarantee any specific behavior. + * + *

Requirements: + *

    + *
  • The starting address of {@code segment} must be aligned to the system page size.
  • + *
  • {@code segment} must represent native (off-heap) memory. + * Passing a non-native {@link MemorySegment} will result in an {@link IllegalArgumentException}.
  • + *
+ * + * @param segment + * the starting memory segment of the region to be advised; must refer to native memory and be page-size aligned + * @param length + * the length in bytes of the memory region starting at {@code segment} + * @param advice + * the access pattern advice (for example {@code MADV_WILLNEED}, {@code MADV_DONTNEED}, {@code MADV_SEQUENTIAL}, etc.) + * @return + * {@code 0} on success, or {@code -1} on failure with {@code errno} set to indicate the error + * + * @throws IllegalArgumentException + * if {@code segment} does not represent native memory + * + * @see madvise manpage + */ + int madvise(MemorySegment segment, long offset, long length, int advice); + + /** Returns native page size. */ + int getPageSize(); + /** corresponds to struct stat64 */ interface Stat64 { long st_size(); diff --git a/libs/native/src/test/java/org/elasticsearch/nativeaccess/MappedByteBufferTests.java b/libs/native/src/test/java/org/elasticsearch/nativeaccess/MappedByteBufferTests.java new file mode 100644 index 0000000000000..cfa9e76543b6f --- /dev/null +++ b/libs/native/src/test/java/org/elasticsearch/nativeaccess/MappedByteBufferTests.java @@ -0,0 +1,159 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.nativeaccess; + +import org.apache.lucene.util.Unwrappable; +import org.elasticsearch.test.ESTestCase; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; + +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.READ; +import static java.nio.file.StandardOpenOption.WRITE; +import static org.hamcrest.Matchers.equalTo; + +public class MappedByteBufferTests extends ESTestCase { + + static NativeAccess nativeAccess; + + @BeforeClass + public static void getAccess() { + nativeAccess = NativeAccess.instance(); + } + + public void testBasic() throws IOException { + int size = randomIntBetween(10, 4096); + testBasicImpl(size, 0); + } + + public void testBasicWithFileMapOffset() throws IOException { + int size = randomIntBetween(10, 4096); + testBasicImpl(size, 1); + } + + public void testBasicTiny() throws IOException { + int size = randomIntBetween(10, 20); + testBasicImpl(size, 0); + testBasicImpl(size, 1); + } + + void testBasicImpl(int size, int filePositionOffset) throws IOException { + var tmp = createTempDir(); + Path file = tmp.resolve("testBasic"); + Files.write(file, newByteArray(size, filePositionOffset), CREATE, WRITE); + // we need to unwrap our test-only file system layers + file = Unwrappable.unwrapAll(file); + int len = size - filePositionOffset; + try ( + FileChannel fileChannel = FileChannel.open(file, READ); + CloseableMappedByteBuffer mappedByteBuffer = nativeAccess.map(fileChannel, MapMode.READ_ONLY, filePositionOffset, len) + ) { + mappedByteBuffer.prefetch(0, len); + + var buffer = mappedByteBuffer.buffer(); + assertThat(buffer.position(), equalTo(0)); + assertThat(buffer.limit(), equalTo(len)); + assertThat((byte) (0), equalTo(buffer.get(0))); // expected first value + assertThat((byte) (len - 1), equalTo(buffer.get(len - 1))); // expected last value + mappedByteBuffer.prefetch(0, len); + + assertSliceOfBuffer(mappedByteBuffer, 0, len); + assertSliceOfBuffer(mappedByteBuffer, 1, len - 1); + assertSliceOfBuffer(mappedByteBuffer, 2, len - 2); + assertSliceOfBuffer(mappedByteBuffer, 3, len - 3); + + assertOutOfBounds(mappedByteBuffer, len); + } + } + + public void testPrefetchWithOffsets() throws IOException { + testPrefetchWithOffsetsImpl(0); + testPrefetchWithOffsetsImpl(1); + testPrefetchWithOffsetsImpl(2); + testPrefetchWithOffsetsImpl(3); + } + + // We just check that the variations do not fail or crash - no + // positive assertion that the prefetch has any observable effect + void testPrefetchWithOffsetsImpl(int filePositionOffset) throws IOException { + int size = randomIntBetween(10, 4096); + var tmp = createTempDir(); + Path file = tmp.resolve("testPrefetchWithOffsets"); + Files.write(file, newByteArray(size, 0), CREATE, WRITE); + // we need to unwrap our test-only file system layers + file = Unwrappable.unwrapAll(file); + int len = size - filePositionOffset; + try ( + FileChannel fileChannel = FileChannel.open(file, READ); + CloseableMappedByteBuffer mappedByteBuffer = nativeAccess.map(fileChannel, MapMode.READ_ONLY, filePositionOffset, len) + ) { + mappedByteBuffer.prefetch(0, len); + mappedByteBuffer.prefetch(0, 0); + mappedByteBuffer.prefetch(0, len - 1); + mappedByteBuffer.prefetch(0, len - 2); + mappedByteBuffer.prefetch(0, len - 3); + mappedByteBuffer.prefetch(0, randomIntBetween(1, len)); + mappedByteBuffer.prefetch(1, len - 1); + mappedByteBuffer.prefetch(2, len - 2); + mappedByteBuffer.prefetch(3, len - 3); + mappedByteBuffer.prefetch(4, len - 4); + mappedByteBuffer.prefetch(1, randomIntBetween(2, len - 1)); + + assertOutOfBounds(mappedByteBuffer, len); + } + } + + static final Class IOOBE = IndexOutOfBoundsException.class; + + static void assertOutOfBounds(CloseableMappedByteBuffer mappedByteBuffer, int size) { + expectThrows(IOOBE, () -> mappedByteBuffer.prefetch(-2, size)); + expectThrows(IOOBE, () -> mappedByteBuffer.prefetch(-1, size)); + expectThrows(IOOBE, () -> mappedByteBuffer.prefetch(1, size)); + expectThrows(IOOBE, () -> mappedByteBuffer.prefetch(2, size)); + expectThrows(IOOBE, () -> mappedByteBuffer.prefetch(3, size)); + expectThrows(IOOBE, () -> mappedByteBuffer.prefetch(0, size + 1)); + expectThrows(IOOBE, () -> mappedByteBuffer.prefetch(0, size + 2)); + } + + static void assertSliceOfBuffer(CloseableMappedByteBuffer mappedByteBuffer, int offset, int length) { + var buffer = mappedByteBuffer.buffer(); + try (var slice = mappedByteBuffer.slice(offset, length)) { + slice.prefetch(0, length); + + // sanitize backing data + var sliceBuffer = slice.buffer(); + assertThat(sliceBuffer.position(), equalTo(0)); + assertThat(sliceBuffer.limit(), equalTo(length)); + assertThat(sliceBuffer.get(), equalTo((byte) offset)); + byte expectedLastByte = buffer.get(buffer.limit() - 1); + byte sliceLastByte = sliceBuffer.get(sliceBuffer.limit() - 1); + assertThat(sliceLastByte, equalTo(expectedLastByte)); + + assertOutOfBounds(slice, length); + } + } + + // Creates a byte array containing monotonically incrementing values, starting + // with a value of 0 at the given offset. Useful to assert positional values. + private byte[] newByteArray(int size, int offset) { + byte[] buffer = new byte[size]; + Arrays.fill(buffer, (byte) 0xFF); + for (int i = 0; i < buffer.length - offset; i++) { + buffer[i + offset] = (byte) i; + } + return buffer; + } +} diff --git a/libs/native/src/test/java/org/elasticsearch/nativeaccess/PosixCLibraryTests.java b/libs/native/src/test/java/org/elasticsearch/nativeaccess/PosixCLibraryTests.java new file mode 100644 index 0000000000000..9e4a074913e6c --- /dev/null +++ b/libs/native/src/test/java/org/elasticsearch/nativeaccess/PosixCLibraryTests.java @@ -0,0 +1,119 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.nativeaccess; + +import org.apache.lucene.store.IOContext; +import org.apache.lucene.util.Constants; +import org.apache.lucene.util.Unwrappable; +import org.elasticsearch.nativeaccess.lib.NativeLibraryProvider; +import org.elasticsearch.nativeaccess.lib.PosixCLibrary; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; +import java.nio.file.StandardOpenOption; + +import static org.elasticsearch.nativeaccess.lib.PosixCLibrary.POSIX_MADV_WILLNEED; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +public class PosixCLibraryTests extends ESTestCase { + NativeAccess nativeAccess; + PosixCLibrary clib; + + @Before + public void setup() { + nativeAccess = NativeAccess.instance(); + if (Constants.LINUX || Constants.MAC_OS_X) { + clib = NativeLibraryProvider.instance().getLibrary(PosixCLibrary.class); + assertNotNull(clib); + } else { + assumeFalse("posix only available on Mac/Linux", Constants.WINDOWS); + } + } + + public void testMadvise() throws IOException { + int size = randomIntBetween(8, 4096); + var tmp = createTempDir(); + try (var dir = newFSDirectory(tmp)) { + try (var out = dir.createOutput("foo.dat", IOContext.DEFAULT)) { + out.writeBytes(randomBytes(size), 0, size); + } + } + // we need to unwrap our test-only file system layers + var file = Unwrappable.unwrapAll(tmp.resolve("foo.dat")); + try (var arena = Arena.ofConfined(); var fc = FileChannel.open(file, StandardOpenOption.READ)) { + var segment = fc.map(MapMode.READ_ONLY, 0, fc.size(), arena); + long length = randomLongBetween(1, fc.size()); + assertThat(clib.madvise(segment, 0, length, POSIX_MADV_WILLNEED), equalTo(0)); + assertThat(clib.madvise(segment, 0, size, POSIX_MADV_WILLNEED), equalTo(0)); + + assertOutOfBounds(segment); + if (STRICT_ALIGNMENT) assertUnaligned(segment); + } + } + + static final Class IOOBE = IndexOutOfBoundsException.class; + + private void assertOutOfBounds(MemorySegment segment) { + final long size = segment.byteSize(); + expectThrows(IOOBE, () -> clib.madvise(segment, 0, -1, POSIX_MADV_WILLNEED)); + expectThrows(IOOBE, () -> clib.madvise(segment, -1, size, POSIX_MADV_WILLNEED)); + expectThrows(IOOBE, () -> clib.madvise(segment, 0, size + 1, POSIX_MADV_WILLNEED)); + expectThrows(IOOBE, () -> clib.madvise(segment, 1, size, POSIX_MADV_WILLNEED)); + expectThrows(IOOBE, () -> clib.madvise(segment, randomIntBetween(1, 10), size + randomIntBetween(1, 10), POSIX_MADV_WILLNEED)); + } + + static final int EINVAL = 22; + static final boolean STRICT_ALIGNMENT = System.getProperty("os.name").startsWith("Linux"); + + private void assertUnaligned(MemorySegment segment) { + final long size = segment.byteSize(); + assertThat(clib.madvise(segment, 1, size - 1, POSIX_MADV_WILLNEED), equalTo(-1)); + assertThat(clib.errno(), equalTo(EINVAL)); + assertThat(clib.strerror(clib.errno()), containsString("Invalid argument")); + + assertThat(clib.madvise(segment, 3, size - 3, POSIX_MADV_WILLNEED), equalTo(-1)); + assertThat(clib.errno(), equalTo(EINVAL)); + assertThat(clib.strerror(clib.errno()), containsString("Invalid argument")); + } + + public void testMadviseZeroLength() { + var mem = MemorySegment.ofAddress(0); + expectThrows(IOOBE, () -> clib.madvise(mem, 0, 10, POSIX_MADV_WILLNEED)); + } + + public void testMadviseIAE() { + byte[] buf = randomBytes(randomInt(16)); + var mem = MemorySegment.ofArray(buf); + expectThrows(IllegalArgumentException.class, () -> clib.madvise(mem, 0, buf.length, POSIX_MADV_WILLNEED)); + } + + public void testPageSize() { + int pageSize = clib.getPageSize(); + assertThat(pageSize, greaterThan(1)); + assertTrue(isPowerOfTwo(pageSize)); + } + + private byte[] randomBytes(int size) { + byte[] buffer = new byte[size]; + random().nextBytes(buffer); + return buffer; + } + + private static boolean isPowerOfTwo(int value) { + return (value & (value - 1)) == 0; + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java b/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java index 0a5471ae6a197..123037a637c9a 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/common/lucene/store/ESIndexInputTestCase.java @@ -72,7 +72,7 @@ protected byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IO int readPos = (int) indexInput.getFilePointer(); byte[] output = new byte[length]; while (readPos < length) { - final var readStrategy = between(0, 8); + final var readStrategy = between(0, 9); switch (readStrategy) { case 0, 1, 2, 3: if (length - readPos >= Long.BYTES && readStrategy <= 0) { @@ -113,8 +113,10 @@ protected byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IO readPos = between(readPos, randomAccessReadEnd); indexInput.seek(readPos); } - indexInput.seek(readPos); // BUG these random-access reads shouldn't affect the current position + if (readPos < length) { + indexInput.prefetch(readPos, randomIntBetween(1, Math.max(length - readPos - 1, 1))); + } } break; case 4: @@ -159,6 +161,14 @@ protected byte[] randomReadAndSlice(IndexInput indexInput, int length) throws IO assertEquals(readPos, indexInput.getFilePointer()); break; case 8: + // Prefetch at random positions + int loop = randomIntBetween(2, 5); + for (int i = 0; i < loop; i++) { + int offset = randomIntBetween(0, length - 1); + indexInput.prefetch(offset, randomIntBetween(1, Math.max(length - offset - 1, 1))); + } + break; + case 9: // Read clone or slice concurrently final int cloneCount = between(1, 3); final CountDownLatch startLatch = new CountDownLatch(1 + cloneCount); diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index bb1f927cfa666..98658815092be 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -1008,6 +1008,24 @@ SharedBytes.IO testOnlyNonVolatileIO() { return io; } + /** + * Optimistically try to load the data from the region into main memory using madvise system call. + * @return true if successful, i.e., not evicted and data available, false if evicted or mmap is not used underneath. + */ + boolean tryPrefetch(long offset, long length) throws IOException { + SharedBytes.IO ioRef = nonVolatileIO(); + if (ioRef != null) { + ioRef.prefetch(blobCacheService.getRegionRelativePosition(offset), length); + if (isEvicted()) { + return false; + } + return true; + } else { + // taken by someone else + return false; + } + } + /** * Optimistically try to read from the region * @return true if successful, i.e., not evicted and data available, false if evicted @@ -1241,6 +1259,36 @@ public KeyType getCacheKey() { return cacheKey; } + public boolean tryPrefetch(long offset, long length) throws IOException { + assert assertOffsetsWithinFileLength(offset, length, this.length); + final int startRegion = getRegion(offset); + final long end = offset + length; + final int endRegion = getEndingRegion(end); + final var rangeToRead = ByteRange.of(offset, offset + length); + for (int region = startRegion; region <= endRegion; region++) { + final ByteRange subRangeToRead = mapSubRangeToRegion(rangeToRead, region); + if (subRangeToRead.isEmpty()) { + // nothing to read, skip + continue; + } + var fileRegion = lastAccessedRegion; + try { + fileRegion = cache.get(cacheKey, this.length, region); + } catch (AlreadyClosedException exc) { + // consider missing + continue; + } + final var chunk = fileRegion.chunk; + if (chunk.tracker.checkAvailable(subRangeToRead.length()) == false) { + continue; + } + if (chunk.tryPrefetch(subRangeToRead.start(), subRangeToRead.length())) { + length -= subRangeToRead.length(); + } + } + return length == 0; + } + public boolean tryRead(ByteBuffer buf, long offset) throws IOException { assert assertOffsetsWithinFileLength(offset, buf.remaining(), length); final int startRegion = getRegion(offset); diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBytes.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBytes.java index 5b6669afc4a9c..6d6072bf842f7 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBytes.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBytes.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.Unwrappable; import org.elasticsearch.blobcache.BlobCacheUtils; import org.elasticsearch.blobcache.common.ByteBufferReference; import org.elasticsearch.common.unit.ByteSizeValue; @@ -18,14 +19,15 @@ import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.nativeaccess.CloseableMappedByteBuffer; import org.elasticsearch.nativeaccess.NativeAccess; import java.io.IOException; import java.io.InputStream; import java.io.RandomAccessFile; import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; +import java.nio.channels.FileChannel.MapMode; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; @@ -82,6 +84,8 @@ public class SharedBytes extends AbstractRefCounted { if (fileSize > 0) { cacheFile = findCacheSnapshotCacheFilePath(environment, fileSize); preallocate(cacheFile, fileSize); + // we need to unwrap our test-only file system layers + cacheFile = Unwrappable.unwrapAll(cacheFile); this.fileChannel = FileChannel.open(cacheFile, OPEN_OPTIONS); assert this.fileChannel.size() == fileSize : "expected file size " + fileSize + " but was " + fileChannel.size(); } else { @@ -98,17 +102,18 @@ public class SharedBytes extends AbstractRefCounted { int mapSize = regionsPerMmap * regionSize; int lastMapSize = Math.toIntExact(fileSize % mapSize); int mapCount = Math.toIntExact(fileSize / mapSize) + (lastMapSize == 0 ? 0 : 1); - MappedByteBuffer[] mmaps = new MappedByteBuffer[mapCount]; + CloseableMappedByteBuffer[] mmaps = new CloseableMappedByteBuffer[mapCount]; for (int i = 0; i < mapCount - 1; i++) { - mmaps[i] = fileChannel.map(FileChannel.MapMode.READ_ONLY, (long) mapSize * i, mapSize); + mmaps[i] = map(fileChannel, MapMode.READ_ONLY, (long) mapSize * i, mapSize); } - mmaps[mapCount - 1] = fileChannel.map( - FileChannel.MapMode.READ_ONLY, + mmaps[mapCount - 1] = map( + fileChannel, + MapMode.READ_ONLY, (long) mapSize * (mapCount - 1), lastMapSize == 0 ? mapSize : lastMapSize ); for (int i = 0; i < numRegions; i++) { - ios[i] = new IO(i, mmaps[i / regionsPerMmap].slice((i % regionsPerMmap) * regionSize, regionSize)); + ios[i] = new IO(i, mmaps[i / regionsPerMmap].slice((long) (i % regionsPerMmap) * regionSize, regionSize)); } } else { for (int i = 0; i < numRegions; i++) { @@ -315,15 +320,23 @@ public final class IO { private final long pageStart; - private final MappedByteBuffer mappedByteBuffer; + private final CloseableMappedByteBuffer mappedByteBuffer; - private IO(final int sharedBytesPos, MappedByteBuffer mappedByteBuffer) { + private IO(final int sharedBytesPos, CloseableMappedByteBuffer mappedByteBuffer) { long physicalOffset = (long) sharedBytesPos * regionSize; assert physicalOffset <= (long) numRegions * regionSize; this.pageStart = physicalOffset; this.mappedByteBuffer = mappedByteBuffer; } + public boolean prefetch(long offset, long length) { + if (mmap) { + mappedByteBuffer.prefetch(offset, length); + return true; + } + return false; + } + @SuppressForbidden(reason = "Use positional reads on purpose") public int read(ByteBuffer dst, int position) throws IOException { int remaining = dst.remaining(); @@ -332,7 +345,7 @@ public int read(ByteBuffer dst, int position) throws IOException { if (mmap) { bytesRead = remaining; int startPosition = dst.position(); - dst.put(startPosition, mappedByteBuffer, position, bytesRead).position(startPosition + bytesRead); + dst.put(startPosition, mappedByteBuffer.buffer(), position, bytesRead).position(startPosition + bytesRead); } else { bytesRead = fileChannel.read(dst, pageStart + position); } @@ -363,4 +376,9 @@ private static void offsetCheckFailed() { } } + static final NativeAccess NATIVE_ACCESS = NativeAccess.instance(); + + private static CloseableMappedByteBuffer map(FileChannel fileChannel, MapMode mode, long position, long size) throws IOException { + return NATIVE_ACCESS.map(fileChannel, mode, position, size); + } } diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/FrozenIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/FrozenIndexInput.java index dc0199f38b4f2..ab8ae11a56ca8 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/FrozenIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/FrozenIndexInput.java @@ -24,6 +24,7 @@ import org.elasticsearch.xpack.searchablesnapshots.store.IndexInputStats; import org.elasticsearch.xpack.searchablesnapshots.store.SearchableSnapshotDirectory; +import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -214,6 +215,11 @@ protected MetadataCachingIndexInput doSlice( ); } + @Override + public void prefetch(long offset, long length) throws IOException { + cacheFile.tryPrefetch(offset + this.offset, length); + } + @Override public IndexInput clone() { var clone = tryCloneBuffer(); diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/MetadataCachingIndexInput.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/MetadataCachingIndexInput.java index 0c2942f800d42..b68969ee7ab74 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/MetadataCachingIndexInput.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/input/MetadataCachingIndexInput.java @@ -99,7 +99,7 @@ public abstract class MetadataCachingIndexInput extends BlobCacheBufferedIndexIn protected final BlobStoreIndexShardSnapshot.FileInfo fileInfo; protected final IOContext context; protected final IndexInputStats stats; - private final long offset; + protected final long offset; // the following are only mutable so they can be adjusted after cloning/slicing private volatile boolean isClone;