diff --git a/docs/changelog/143531.yaml b/docs/changelog/143531.yaml new file mode 100644 index 0000000000000..cef2c4da26cac --- /dev/null +++ b/docs/changelog/143531.yaml @@ -0,0 +1,5 @@ +area: Vector Search +issues: [] +pr: 143531 +summary: Fix GPU merge `ClassCastException` with wrapped directories +type: bug diff --git a/libs/gpu-codec/src/main/java/org/elasticsearch/gpu/codec/ES92GpuHnswVectorsWriter.java b/libs/gpu-codec/src/main/java/org/elasticsearch/gpu/codec/ES92GpuHnswVectorsWriter.java index 3e930024faae4..312bd00dd8628 100644 --- a/libs/gpu-codec/src/main/java/org/elasticsearch/gpu/codec/ES92GpuHnswVectorsWriter.java +++ b/libs/gpu-codec/src/main/java/org/elasticsearch/gpu/codec/ES92GpuHnswVectorsWriter.java @@ -587,7 +587,7 @@ private void mergeByteVectorField( if (vectorValues != null) { IndexInput slice = vectorValues.getSlice(); - var input = FilterIndexInput.unwrapOnlyTest(slice); + var input = FilterIndexInput.unwrap(slice); if (input instanceof MemorySegmentAccessInput memorySegmentAccessInput) { // Direct access to mmapped file // for int8_hnsw, the raw vector data has extra 4-byte at the end of each vector to encode a correction constant @@ -630,9 +630,11 @@ private void mergeByteVectorField( try (IndexInput clonedSlice = slice.clone()) { clonedSlice.seek(0); - byte[] vector = new byte[fieldInfo.getVectorDimension()]; + int dims = fieldInfo.getVectorDimension(); + byte[] vector = new byte[dims]; for (int i = 0; i < numVectors; ++i) { - clonedSlice.readBytes(vector, 0, fieldInfo.getVectorDimension()); + clonedSlice.readBytes(vector, 0, dims); + clonedSlice.skipBytes(4); // skip scalar quantization correction constant builder.addVector(vector); } } @@ -687,7 +689,7 @@ private void mergeFloatVectorField( if (vectorValues != null) { IndexInput slice = vectorValues.getSlice(); - var input = FilterIndexInput.unwrapOnlyTest(slice); + var input = FilterIndexInput.unwrap(slice); if (input instanceof MemorySegmentAccessInput memorySegmentAccessInput) { // Fast path, possible direct access to mmapped file try ( diff --git a/libs/gpu-codec/src/main/java/org/elasticsearch/gpu/codec/MemorySegmentUtils.java b/libs/gpu-codec/src/main/java/org/elasticsearch/gpu/codec/MemorySegmentUtils.java index 14e999d7b4ebb..9d09004a06e6c 100644 --- a/libs/gpu-codec/src/main/java/org/elasticsearch/gpu/codec/MemorySegmentUtils.java +++ b/libs/gpu-codec/src/main/java/org/elasticsearch/gpu/codec/MemorySegmentUtils.java @@ -11,11 +11,13 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.MemorySegmentAccessInput; +import org.apache.lucene.util.Unwrappable; import org.elasticsearch.core.IOUtils; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; @@ -47,15 +49,34 @@ default void close() {} private MemorySegmentUtils() {} + /** + * Unwraps a {@link Directory} through any {@link FilterDirectory} layers to find the underlying {@link FSDirectory}. + * Elasticsearch wraps directories (e.g. {@code Store$StoreDirectory} extends {@code FilterDirectory}), so a direct + * cast to {@link FSDirectory} will fail at runtime. + * + * @throws IllegalArgumentException if the unwrapped directory is not an {@link FSDirectory} + */ + static FSDirectory unwrapFSDirectory(Directory dir) { + Directory unwrapped = FilterDirectory.unwrap(dir); + if (unwrapped instanceof FSDirectory fsDir) { + return fsDir; + } + throw new IllegalArgumentException( + "expected an FSDirectory but got [" + unwrapped.getClass().getName() + "] after unwrapping [" + dir.getClass().getName() + "]" + ); + } + /** * Creates a file-backed MemorySegment, mapping the first {@param dataSize} bytes from {@param dataFile}, using the * Java {@link FileChannel} API. */ static MemorySegmentHolder createFileBackedMemorySegment(Path dataFile, long dataSize) throws IOException { + // Unwrap test-only filesystem layers so we get a real FileChannelImpl that supports Arena-based map. + Path unwrappedPath = Unwrappable.unwrapAll(dataFile); Arena arena = null; try { arena = Arena.ofConfined(); - try (FileChannel fc = FileChannel.open(dataFile, Set.of(READ))) { + try (FileChannel fc = FileChannel.open(unwrappedPath, Set.of(READ))) { MemorySegment mapped = fc.map(FileChannel.MapMode.READ_ONLY, 0L, dataSize, arena); return new FileBackedMemorySegmentHolder(mapped, arena, dataFile); } @@ -86,17 +107,14 @@ public static MemorySegmentHolder getContiguousMemorySegment(MemorySegmentAccess return new DirectMemorySegmentHolder(inputSlice); } - // The only implementation of MemorySegmentAccessInput is MemorySegmentIndexInput, which is currently used only by - // MMapDirectory. Other implementations are unlikely but theoretically possible, so better assert so we have the - // opportunity to catch this in CI, if that ever happens. - assert dir instanceof FSDirectory; + FSDirectory fsDir = unwrapFSDirectory(dir); log.info( - "Unable to get a contiguous memory segment for [{}, size{}]. Falling back to manual mapping a temp copy.", + "Unable to get a contiguous memory segment for [{}, size [{}]]. Falling back to manual mapping a temp copy.", baseName, input.length() ); - Path tempVectorsFilePath = copyInputToTempFile((IndexInput) input, (FSDirectory) dir, baseName); + Path tempVectorsFilePath = copyInputToTempFile((IndexInput) input, fsDir, baseName); return createFileBackedMemorySegment(tempVectorsFilePath, input.length()); } @@ -137,21 +155,14 @@ public static MemorySegmentHolder getContiguousPackedMemorySegment( } } - assert dir instanceof FSDirectory; + FSDirectory fsDir = unwrapFSDirectory(dir); log.info( - "Unable to get a contiguous memory segment for [{}, size{}]. Falling back creating a packed temp copy.", + "Unable to get a contiguous memory segment for [{}, size [{}]]. Falling back creating a packed temp copy.", baseName, input.length() ); - var tempVectorsFile = copyInputToTempFilePacked( - (IndexInput) input, - (FSDirectory) dir, - baseName, - numVectors, - sourceRowPitch, - packedRowSize - ); + var tempVectorsFile = copyInputToTempFilePacked((IndexInput) input, fsDir, baseName, numVectors, sourceRowPitch, packedRowSize); return createFileBackedMemorySegment(tempVectorsFile, packedVectorsDataSize); } diff --git a/x-pack/plugin/gpu/src/internalClusterTest/java/org/elasticsearch/xpack/gpu/GPUMergeFallbackIT.java b/x-pack/plugin/gpu/src/internalClusterTest/java/org/elasticsearch/xpack/gpu/GPUMergeFallbackIT.java new file mode 100644 index 0000000000000..5034c1e143776 --- /dev/null +++ b/x-pack/plugin/gpu/src/internalClusterTest/java/org/elasticsearch/xpack/gpu/GPUMergeFallbackIT.java @@ -0,0 +1,259 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.gpu; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MMapDirectory; +import org.apache.lucene.tests.util.LuceneTestCase; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gpu.CuVSGPUSupport; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.IndexStorePlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.vectors.KnnSearchBuilder; +import org.elasticsearch.test.ESIntegTestCase; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.elasticsearch.xpack.gpu.GPUPlugin.VECTORS_INDEXING_USE_GPU_NODE_SETTING; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +/** + * Integration test that exercises the GPU merge fallback path where vector data exceeds + * {@link MMapDirectory}'s max chunk size and we resort to copying vector data to a temporary + * file and mapping it as a single contiguous {@code MemorySegment}. + *

+ * Uses a custom store type with a very small max chunk size so that even moderate amounts + * of vector data trigger the file-backed fallback during merge. + */ +@LuceneTestCase.SuppressCodecs("*") +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 1) +public class GPUMergeFallbackIT extends ESIntegTestCase { + + static final String SMALL_CHUNK_STORE_TYPE = "small_chunk_mmapfs"; + static final int SMALL_MAX_CHUNK_SIZE = 1024; + + public static class SmallChunkMMapStorePlugin extends Plugin implements IndexStorePlugin { + @Override + public Map getDirectoryFactories() { + return Map.of(SMALL_CHUNK_STORE_TYPE, new SmallChunkDirectoryFactory()); + } + } + + static class SmallChunkDirectoryFactory implements IndexStorePlugin.DirectoryFactory { + @Override + public Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException { + Path location = shardPath.resolveIndex(); + Files.createDirectories(location); + return new MMapDirectory(location, SMALL_MAX_CHUNK_SIZE); + } + } + + public static class TestGPUPlugin extends GPUPlugin { + public TestGPUPlugin(Settings settings) { + super(settings); + } + + @Override + protected boolean isGpuIndexingFeatureAllowed() { + return true; + } + + @Override + public List getActions() { + return List.of(); + } + } + + @BeforeClass + public static void checkGPUSupport() { + assumeTrue("cuvs not supported", CuVSGPUSupport.instance().isSupported()); + } + + @Override + protected Collection> nodePlugins() { + return List.of(TestGPUPlugin.class, SmallChunkMMapStorePlugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(VECTORS_INDEXING_USE_GPU_NODE_SETTING.getKey(), GPUPlugin.GpuMode.TRUE.name()) + .build(); + } + + public void testForceMergeWithFileFallbackHnsw() { + doTestForceMergeWithFileFallback("hnsw"); + } + + public void testForceMergeWithFileFallbackInt8Hnsw() { + doTestForceMergeWithFileFallback("int8_hnsw"); + } + + /** + * Indexes vector data across multiple segments so the merged data exceeds the small max chunk + * size, then force-merges. This exercises the path where a contiguous memory segment cannot be + * obtained directly and we resort to copying vector data to a temporary file. + */ + private void doTestForceMergeWithFileFallback(String type) { + String indexName = "gpu_merge_fallback_" + type.replace("_", ""); + int dims = randomIntBetween(32, 128); + String similarity = randomFrom("dot_product", "l2_norm", "cosine"); + + Settings indexSettings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.store.type", SMALL_CHUNK_STORE_TYPE) + .build(); + + String mapping = String.format(Locale.ROOT, """ + { + "properties": { + "my_vector": { + "type": "dense_vector", + "dims": %d, + "similarity": "%s", + "index_options": { + "type": "%s", + "ef_construction": 200, + "m": 24 + } + } + } + } + """, dims, similarity, type); + + assertAcked(prepareCreate(indexName).setSettings(indexSettings).setMapping(mapping)); + ensureGreen(); + + int docsPerBatch = randomIntBetween(1000, 2000); + int numBatches = randomIntBetween(2, 4); + int totalDocs = 0; + + int bytesPerVector = "int8_hnsw".equals(type) ? dims + 4 : dims * Float.BYTES; + long expectedMergedSize = (long) docsPerBatch * numBatches * bytesPerVector; + assert expectedMergedSize > SMALL_MAX_CHUNK_SIZE + : "merged vector data [" + expectedMergedSize + "] must exceed max chunk size [" + SMALL_MAX_CHUNK_SIZE + "]"; + + for (int batch = 0; batch < numBatches; batch++) { + BulkRequestBuilder bulkRequest = client().prepareBulk(); + for (int i = 0; i < docsPerBatch; i++) { + String id = String.valueOf(totalDocs + i); + float[] vector = randomFloatVector(dims, similarity); + bulkRequest.add(prepareIndex(indexName).setId(id).setSource("my_vector", vector)); + } + BulkResponse bulkResponse = bulkRequest.get(); + assertFalse("Bulk request failed: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); + totalDocs += docsPerBatch; + flushAndRefresh(indexName); + } + + long segmentsBefore = indicesAdmin().prepareStats(indexName) + .clear() + .setSegments(true) + .get() + .getPrimaries() + .getSegments() + .getCount(); + assertThat("expected multiple segments before merge", segmentsBefore, greaterThan(1L)); + + int k = 50; + int numCandidates = k * 5; + float[] queryVector = randomFloatVector(dims, similarity); + + Set idsBeforeMerge = new HashSet<>(); + var responseBefore = prepareSearch(indexName).setSize(k) + .setFetchSource(false) + .setKnnSearch(List.of(new KnnSearchBuilder("my_vector", queryVector, k, numCandidates, null, null, null))) + .get(); + try { + SearchHit[] hitsBefore = responseBefore.getHits().getHits(); + assertEquals(k, hitsBefore.length); + for (SearchHit hit : hitsBefore) { + idsBeforeMerge.add(hit.getId()); + } + } finally { + responseBefore.decRef(); + } + + assertNoFailures(indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get()); + + long segmentsAfter = indicesAdmin().prepareStats(indexName).clear().setSegments(true).get().getPrimaries().getSegments().getCount(); + assertThat("expected a single segment after merge", segmentsAfter, equalTo(1L)); + + var responseAfter = prepareSearch(indexName).setSize(k) + .setFetchSource(false) + .setKnnSearch(List.of(new KnnSearchBuilder("my_vector", queryVector, k, numCandidates, null, null, null))) + .get(); + try { + SearchHit[] hitsAfter = responseAfter.getHits().getHits(); + assertEquals(k, hitsAfter.length); + assertAtLeastNOutOfKMatches(idsBeforeMerge, hitsAfter, k / 2, k); + } finally { + responseAfter.decRef(); + } + } + + private static void assertAtLeastNOutOfKMatches(Set idsBefore, SearchHit[] hitsAfter, int minMatches, int k) { + int matches = 0; + for (SearchHit hit : hitsAfter) { + if (idsBefore.contains(hit.getId())) { + matches++; + } + } + assertTrue( + "Expected at least " + minMatches + " out of " + k + " results to match before/after merge, but got " + matches, + matches >= minMatches + ); + } + + private static float[] randomFloatVector(int dims, String similarity) { + if ("dot_product".equals(similarity)) { + return randomUnitVector(dims); + } + float[] vector = new float[dims]; + for (int i = 0; i < dims; i++) { + vector[i] = randomFloat(); + } + return vector; + } + + private static float[] randomUnitVector(int dims) { + float[] vector = new float[dims]; + double sumSquares = 0.0; + for (int i = 0; i < dims; i++) { + vector[i] = randomFloat() * 2 - 1; + sumSquares += vector[i] * vector[i]; + } + float magnitude = (float) Math.sqrt(sumSquares); + if (magnitude > 0) { + for (int i = 0; i < dims; i++) { + vector[i] /= magnitude; + } + } + return vector; + } +} diff --git a/x-pack/plugin/gpu/src/main/plugin-metadata/entitlement-policy.yaml b/x-pack/plugin/gpu/src/main/plugin-metadata/entitlement-policy.yaml index d0c571b8538b2..236be509b6616 100644 --- a/x-pack/plugin/gpu/src/main/plugin-metadata/entitlement-policy.yaml +++ b/x-pack/plugin/gpu/src/main/plugin-metadata/entitlement-policy.yaml @@ -1,2 +1,8 @@ com.nvidia.cuvs: - load_native_libraries + +org.elasticsearch.gpu: + - files: + - relative_path: indices + relative_to: data + mode: read_write